Thanks for the quick response.
Querying the Dataflow API seems like something that could break easily, but
I can go with that if it turns out to be easier.

The Splittable DoFn way sounds interesting, but I'm not very familiar with
that so I have some questions around it:
Splits seem to operate on offsets within a single element. Does that mean
that I'd set a fixed shard number x, and then I'd need to first group my
PCollection of single elements into a PCollection of lists, each size x?
And are the subsequent writes also limited to x workers, meaning that
splits have the same issue as with a GroupByKey?
I see the UnboundedCountingSource gets a `desiredNumSplits` parameter. I'm
assuming there is nothing similar that would allow a Splittable DoFn to
simply figure out the number of workers even if it changes? That's probably
very hacky anyway.
If the above solution with fixed-size lists makes sense and will
redistribute the writes I'm already happy, I don't necessarily need to have
the throttling step dynamically match autoscaling.

On Thu, Apr 15, 2021 at 4:20 PM Pablo Estrada <[email protected]> wrote:

> You could implement a Splittable DoFn that generates a limited number of
> splits. We do something like this for
> GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It
> keeps track of its local EPS, and generates new splits if more EPSs are
> wanted. This should help you scale up to the maximum of EPS that you want,
> and autoscaling will only produce the appropriate number of workers for
> that number of splits.
>
> - The only issue may be that you can't "scale down" if you find that some
> of your splits have a very low throughput, because two splits can't be
> merged back together (does that make sense?) - but Dataflow should be able
> to scale down and schedule multiple splits in a single worker if that's the
> case.
>
> The UnboundedCountingSource is a Source, so it can't have an input (and
> it's deprecated), but you could write a SplittableDoFn that has the same
> behavior. Do you think this could work?
>
>
> [1]
> https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348
>
> On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin <[email protected]> wrote:
>
>> Could you possibly use a side input with fixed interval triggering[1] to
>> query the Dataflow API to get the most recent log statement of scaling as
>> suggested here[2]?
>>
>> [1]
>> https://beam.apache.org/documentation/patterns/side-inputs/
>> [2]
>> https://stackoverflow.com/a/54406878/6432284
>>
>> On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen <[email protected]>
>> wrote:
>>
>>> Hi folks,
>>>
>>> I've been working on a custom PTransform that makes requests to another
>>> service, and would like to add a rate limiting feature there. The
>>> fundamental issue that I'm running into here is that I need a decent
>>> heuristic to estimate the worker count, so that each worker can
>>> independently set a limit which globally comes out to the right value. All
>>> of this is easy if I know how many machines I have, but I'd like to use
>>> Dataflow's autoscaling, which would easily break any pre-configured value.
>>> I have seen two main approaches for rate limiting, both for a
>>> configurable variable x:
>>>
>>>    - Simply assume worker count is x, then divide by x to figure out
>>>    the "local" limit. The issue I have here is that if we assume x is 500, 
>>> but
>>>    it is actually 50, I'm now paying for 50 nodes to throttle 10 times as 
>>> much
>>>    as necessary. I know the pipeline options have a reference to the runner,
>>>    is it possible to get an approximate current worker count from that at
>>>    bundle start (*if* runner is DataflowRunner)?
>>>    - Add another PTransform in front of the API requests, which groups
>>>    by x number of keys, throttles, and keeps forwarding elements with an
>>>    instant trigger. I initially really liked this solution because even if x
>>>    is misconfigured, I will have at most x workers running and throttle
>>>    appropriately. However, I noticed that for batch pipelines, this
>>>    effectively also caps the API request stage at x workers. If I throw in a
>>>    `Reshuffle`, there is another GroupByKey (-> another stage), and nothing
>>>    gets done until every element has passed through the throttler.
>>>
>>> Has anyone here tried to figure out rate limiting with Beam before, and
>>> perhaps run into similar issues? I would love to know if there is a
>>> preferred solution to this type of problem.
>>> I know sharing state like that runs a little counter to the Beam
>>> pipeline paradigm, but really all I need is an approximate worker count
>>> with few guarantees.
>>>
>>> Cheers,
>>> Daniel
>>>
>>

-- 

Daniel Thevessen  |  Site Reliability Engineer

Firestore SRE
San Francisco, CA, USA |  +1 (415) 373-7762

Reply via email to