Right so in that case we really have one knob to turn: the number of
records (bundle size). We would still want to choose some kind of
reasonable upper bound for the number of records being read. In the case
where the collection/partition being read from has 2 billion things say we
decide to split at 100k documents... that would mean that Beam could spin
up 20k parallel workers if it wanted to. I expect that would overwhelm the
source DB. And yes I could increase the bundle size, but that could cause
other performance issues. What I'm really after is two knobs: the number of
records being read, and the number of things reading those records at the
same time.

Is my understanding here correct?

Thanks!

On Wed, Apr 9, 2025 at 8:34 AM Radek Stankiewicz <radosl...@google.com>
wrote:

> in the context of mongodb - there are already configuration pieces:
>
> https://github.com/apache/beam/blob/7136380c4a79f8dea9b42a42ee7569b665edf431/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L230
> bucketAuto or numSplits.
>
> exact logic how it would split it is written here:
> https://github.com/apache/beam/blob/7136380c4a79f8dea9b42a42ee7569b665edf431/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L500
>
>
>
>
> On Wed, Apr 9, 2025 at 5:25 PM Jonathan Hope <
> jonathan.douglas.h...@gmail.com> wrote:
>
>> I think I'm still struggling a bit...
>>
>> Let's stick with a bounded example for now. I would be reading from a
>> single mongo cluster/database/collection/partition that has billions of
>> things in it. I read through the mongoio code a bit and it seems to:
>>
>>
>>    1. Get the min ID
>>    2. Get the max ID
>>    3. Split that range by bundle size
>>    4. Read those ranges
>>
>> In the case of billions I imagine that would be a large number of splits.
>> My (possibly incorrect) understanding is that Beam would try to go parallel
>> here. Ideally I would have some way to say only read x bundles at once.
>>
>> Thanks!
>>
>> On Wed, Apr 9, 2025 at 5:32 AM Radek Stankiewicz <radosl...@google.com>
>> wrote:
>>
>>> hey Jonathan,
>>>
>>> parallelism for read and write is directly related to the amount of keys
>>> you are processing in the current stage.
>>> As an example - Imagine you have KafkaIO with 1 partition - and after
>>> reading from KafkaIO you have a mapping step to JDBC entity and then you
>>> have a step writing to the database.
>>> If you keep all those steps in the same stage, you will have single
>>> threaded write to the database.
>>> If you add a Redistribute transform with numKeys set to N, you may have
>>> up to N parallel writes. More and more IOs are leveraging this transform to
>>> control the parallelism.
>>> Rate limiting is per worker, if you have a backlog and unlimited keys,
>>> runner may decide to add more workers which will eventually increase
>>> amount of calls.
>>>
>>> If you are using java and JDBCIO, you can specify the amount of splits
>>> by setting numPartitions and specifying the partitioning column.
>>>
>>> What I would recommend investigating is connection pooling as this is
>>> also problematic with operational databases. For JDBC, hikariCP is pretty
>>> easy to integrate.
>>>
>>> On Thu, Apr 3, 2025 at 12:32 AM Jonathan Hope <
>>> jonathan.douglas.h...@gmail.com> wrote:
>>>
>>>> Hello all,
>>>>
>>>> The pipeline I'm working on will be run against two different databases
>>>> that are both online so I will have to control the amount of data being
>>>> read/written to maintain QoS. To do so I would have to control the number
>>>> of workers that are being executed in parallel for a given step.
>>>>
>>>> I saw this issue on Github: https://github.com/apache/beam/issues/17835.
>>>> However it has been closed and I don't see any related PRs/comments/etc.
>>>> Did that work get done, or did it just get cancelled?
>>>>
>>>> I also saw this post:
>>>> https://medium.com/art-of-data-engineering/steady-as-she-flows-rate-limiting-in-apache-beam-pipelines-42cab0b7f31d.
>>>> That would definitely work, but it would result in workers spinning up and
>>>> waiting on locks (which costs money).
>>>>
>>>> Perhaps this is more of a runner concern though, and I did see a way to
>>>> limit the maximum number of workers here:
>>>> https://cloud.google.com/dataflow/docs/reference/pipeline-options. I
>>>> believe that would also work, but then the max number of workers would be
>>>> dictated by whatever step has the highest performance cost which would
>>>> result in the pipelines being slower overall.
>>>>
>>>> Thanks!
>>>>
>>>

Reply via email to