You could implement a Splittable DoFn that generates a limited number of
splits. We do something like this for
GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It
keeps track of its local EPS, and generates new splits if more EPSs are
wanted. This should help you scale up to the maximum of EPS that you want,
and autoscaling will only produce the appropriate number of workers for
that number of splits.

- The only issue may be that you can't "scale down" if you find that some
of your splits have a very low throughput, because two splits can't be
merged back together (does that make sense?) - but Dataflow should be able
to scale down and schedule multiple splits in a single worker if that's the
case.

The UnboundedCountingSource is a Source, so it can't have an input (and
it's deprecated), but you could write a SplittableDoFn that has the same
behavior. Do you think this could work?


[1]
https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348

On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin <evan.gal...@gmail.com> wrote:

> Could you possibly use a side input with fixed interval triggering[1] to
> query the Dataflow API to get the most recent log statement of scaling as
> suggested here[2]?
>
> [1]
> https://beam.apache.org/documentation/patterns/side-inputs/
> [2]
> https://stackoverflow.com/a/54406878/6432284
>
> On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen <dant...@google.com> wrote:
>
>> Hi folks,
>>
>> I've been working on a custom PTransform that makes requests to another
>> service, and would like to add a rate limiting feature there. The
>> fundamental issue that I'm running into here is that I need a decent
>> heuristic to estimate the worker count, so that each worker can
>> independently set a limit which globally comes out to the right value. All
>> of this is easy if I know how many machines I have, but I'd like to use
>> Dataflow's autoscaling, which would easily break any pre-configured value.
>> I have seen two main approaches for rate limiting, both for a
>> configurable variable x:
>>
>>    - Simply assume worker count is x, then divide by x to figure out the
>>    "local" limit. The issue I have here is that if we assume x is 500, but it
>>    is actually 50, I'm now paying for 50 nodes to throttle 10 times as much 
>> as
>>    necessary. I know the pipeline options have a reference to the runner, is
>>    it possible to get an approximate current worker count from that at bundle
>>    start (*if* runner is DataflowRunner)?
>>    - Add another PTransform in front of the API requests, which groups
>>    by x number of keys, throttles, and keeps forwarding elements with an
>>    instant trigger. I initially really liked this solution because even if x
>>    is misconfigured, I will have at most x workers running and throttle
>>    appropriately. However, I noticed that for batch pipelines, this
>>    effectively also caps the API request stage at x workers. If I throw in a
>>    `Reshuffle`, there is another GroupByKey (-> another stage), and nothing
>>    gets done until every element has passed through the throttler.
>>
>> Has anyone here tried to figure out rate limiting with Beam before, and
>> perhaps run into similar issues? I would love to know if there is a
>> preferred solution to this type of problem.
>> I know sharing state like that runs a little counter to the Beam pipeline
>> paradigm, but really all I need is an approximate worker count with few
>> guarantees.
>>
>> Cheers,
>> Daniel
>>
>

Reply via email to