Yes I was referring to
https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
since
that is responsible for scheduling the bundle finalization. It will only be
invoked if the current bundle completes.

I would add logging to ParDoEvaluator#finishBundle[1] to see that the
bundle is being completed.
I would add logging to EvaluationContext#handleResult[2] to see how the
bundle completion is being handled at the bundle finalization callback is
being invoked.

1:
https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L267
2:
https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java#L157

On Thu, Jun 17, 2021 at 10:42 AM Alex Koay <alexkoa...@gmail.com> wrote:

> Could you be referring to this part?
> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
>
> I've tried fiddling with it and it gave some good results if I recall
> correctly.
> I didn't mention it earlier because I thought that maybe a shorter expiry
> actually causes the readers to expire faster and thus releasing the unacked
> messages for another reader to bundle up.
>
> I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get
> called for at least some time if the bundle size is not maxed (or close to
> maxed) out.
> I've added logging into finalizeCheckpoint() and don't see it getting
> called. It's where the acknowledgements are happening.
>
> I've actually opened a Google support ticket for this as well, perhaps you
> could take a look at it (Case 28209335).
> Thanks for your reply, I'll try to debug this further too.
>
> On Fri, Jun 18, 2021 at 12:49 AM Luke Cwik <lc...@google.com> wrote:
>
>> Reducing the bundle size/timeout shouldn't be necessary since when the
>> UnboundedSource returns false from advance(), the
>> UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and
>> return resume for the process continuation. This should cause
>> invokeProcessElement() to complete in
>> OutputAndTimeBoundedSplittableProcessElementInvoker and the runner specific
>> implementation should finish the current bundle. This will allow the runner
>> to do two things:
>> 1) Finalize the current bundle
>> 2) Schedule the continuation for the checkpoint mark
>>
>> Based upon your description it looks like for some reason the runner is
>> unable to complete the current bundle.
>>
>> On Thu, Jun 17, 2021 at 2:48 AM Alex Koay <alexkoa...@gmail.com> wrote:
>>
>>> Okay, I think I've found the issue, but now I need some help figuring
>>> out how to fix the issue.
>>>
>>> 1. Solace allows a number of unacknowledged messages before it stops
>>> sending more messages. This number just so happens to be 10k messages by
>>> default (in the queue I am using).
>>> 2. The Solace Beam transform (rightly) waits until bundle finalization
>>> before acknowledging the messages.
>>> 3. Bundle finalization doesn't happen until it either reaches 10k
>>> messages or 10s for the DataflowRunner. For the PortableRunner this seems
>>> to be 10k or an unknown timeout. This is related to the
>>> OutputAndTimeBoundedSplittableProcessElementInvoker.
>>> 4. Many readers are created (over and over) due to the
>>> UnboundedSourceAsSdfWrapperFn.
>>> 5. When a lot of readers are created, they would compete for the
>>> messages (in non-exclusive mode), eventually leaving a small number of
>>> unacknowledged messages per bundle.
>>> 6. The readers are then cached in cachedReaders in the
>>> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get
>>> evicted after a minute. See https://github.com/apache/beam/pull/13592
>>> 7. The readers each have a small number of unacknowledged messages which
>>> will remain unacknowledged and cannot be given to another consumer until
>>> the bundle finalization happens.
>>> 8. When bundle finalization happens (possibly after the reader gets
>>> evicted), the messages return to the queue, only to get taken by the huge
>>> number of other competing readers.
>>>
>>> At this point, I'm guessing the few methods to fix this are:
>>> a. reduce the bundle size / reduce the bundle timeout (which all seem to
>>> be hardcoded per runner)
>>> b. reduce the number of cached readers / their timeouts (which doesn't
>>> seem to be customizable either) so that there would be less contention
>>> c. somehow reduce the splitting process and instead reusing existing
>>> sources over and over
>>>
>>> I'd be happy to send pull requests to help fix this issue but perhaps
>>> will need some direction as to how I should fix this.
>>>
>>> On Wed, Jun 16, 2021 at 8:32 PM Alex Koay <alexkoa...@gmail.com> wrote:
>>>
>>>> Alright, some updates.
>>>>
>>>> Using DirectRunner helped narrow things down quite a bit. It seems that
>>>> the Solace transform is somewhat buggy when used with the
>>>> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark.
>>>> Refer to this:
>>>> https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40
>>>>
>>>> The source simply creates a new Reader every time createReader() is
>>>> called.
>>>>
>>>> Because of these, the cachedReaders in the
>>>> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in
>>>> readers not being closed, but stay in the cache.
>>>> Changing the timeout causes the pipeline to continue draining but at a
>>>> glacial pace.
>>>>
>>>> I've still not able to isolate the root cause of why it suddenly stops
>>>> reading more data (could be a Solace issue though).
>>>>
>>>>
>>>> Also, trying the easy way out, I've tried running it with 2.24.0 (the
>>>> last one without the SDF default Read) in Java and it works perfectly.
>>>> Newer versions in Java DirectRunner don't work correctly either.
>>>> Unfortunately Dataflow seems to expand the external transform using the
>>>> SDF Read version even when using 2.24.0 (I'm not entirely sure why this is
>>>> the case).
>>>>
>>>> I feel like I'm almost at the verge of fixing the problem, but at this
>>>> point I'm still far from it.
>>>>
>>>>
>>>> On Wed, Jun 16, 2021 at 11:24 AM Alex Koay <alexkoa...@gmail.com>
>>>> wrote:
>>>>
>>>>> 1. I'm building a streaming pipeline.
>>>>> 2. For the pure Java transforms pipeline I believe it got substituted
>>>>> with a Dataflow native Solace transform (it isn't using use_runner_v2 as I
>>>>> think Java doesn't support that publicly yet). I used the default Java
>>>>> flags with a DataflowRunner.
>>>>> 3. I believe it's the source reader that is being created in mass.
>>>>>
>>>>> Currently I just tested the Python pipeline (with Java Solace
>>>>> transform) on the DirectRunner without bounds, and it seems that the issue
>>>>> is similarly manifesting. I'm trying to debug it this way for now.
>>>>>
>>>>> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang <boyu...@google.com>
>>>>> wrote:
>>>>>
>>>>>> In terms of the odd case you are experiencing,  it seems like you are
>>>>>> comparing a pure java pipeline with a cross-language pipeline, right? I
>>>>>> want to learn more details on this case:
>>>>>>
>>>>>>    - Is this a batch pipeline or a streaming pipeline?
>>>>>>    - For your pure java transforms pipeline, do you run the pipeline
>>>>>>    with 'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'?
>>>>>>    - For a large number of consumers, do you mean dataflow workers
>>>>>>    or the source reader?
>>>>>>
>>>>>> If you can share the implementation of the source and the pipeline,
>>>>>> that would be really helpful.
>>>>>>
>>>>>> +Lukasz Cwik <lc...@google.com> for awareness.
>>>>>>
>>>>>> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath <
>>>>>> chamik...@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay <alexkoa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Several questions:
>>>>>>>>
>>>>>>>> 1. Is there any way to set the log level for the Java workers via a
>>>>>>>> Python Dataflow pipeline?
>>>>>>>>
>>>>>>>
>>>>>>>> 2. What is the easiest way to debug an external transform in Java?
>>>>>>>> My main pipeline code is in Python.
>>>>>>>>
>>>>>>>
>>>>>>> In general, debugging a job should be similar to any other Dataflow
>>>>>>> job [1]. But some of the SDK options available to the main SDK 
>>>>>>> environment
>>>>>>> are currently not available to external SDK environments. This includes
>>>>>>> changing the debug level. So I suggest adding INFO logs instead of 
>>>>>>> changing
>>>>>>> the debug level if possible.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF
>>>>>>>> that I should be wary of? I'm currently encountering a odd case (in
>>>>>>>> Dataflow) where a Java pipeline runs with only one worker all the way
>>>>>>>> reading Solace messages, but with an external transform in Python, it
>>>>>>>> generates a large number of consumers and stop reading messages 
>>>>>>>> altogether
>>>>>>>> about 90% of the way.
>>>>>>>>
>>>>>>>
>>>>>>> +Boyuan Zhang <boyu...@google.com> might be able to help.
>>>>>>>
>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Alex
>>>>>>>>
>>>>>>>

Reply via email to