Thanks for the quick response. Querying the Dataflow API seems like something that could break easily, but I can go with that if it turns out to be easier.
The Splittable DoFn way sounds interesting, but I'm not very familiar with that so I have some questions around it: Splits seem to operate on offsets within a single element. Does that mean that I'd set a fixed shard number x, and then I'd need to first group my PCollection of single elements into a PCollection of lists, each size x? And are the subsequent writes also limited to x workers, meaning that splits have the same issue as with a GroupByKey? I see the UnboundedCountingSource gets a `desiredNumSplits` parameter. I'm assuming there is nothing similar that would allow a Splittable DoFn to simply figure out the number of workers even if it changes? That's probably very hacky anyway. If the above solution with fixed-size lists makes sense and will redistribute the writes I'm already happy, I don't necessarily need to have the throttling step dynamically match autoscaling. On Thu, Apr 15, 2021 at 4:20 PM Pablo Estrada <[email protected]> wrote: > You could implement a Splittable DoFn that generates a limited number of > splits. We do something like this for > GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It > keeps track of its local EPS, and generates new splits if more EPSs are > wanted. This should help you scale up to the maximum of EPS that you want, > and autoscaling will only produce the appropriate number of workers for > that number of splits. > > - The only issue may be that you can't "scale down" if you find that some > of your splits have a very low throughput, because two splits can't be > merged back together (does that make sense?) - but Dataflow should be able > to scale down and schedule multiple splits in a single worker if that's the > case. > > The UnboundedCountingSource is a Source, so it can't have an input (and > it's deprecated), but you could write a SplittableDoFn that has the same > behavior. Do you think this could work? > > > [1] > https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348 > > On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin <[email protected]> wrote: > >> Could you possibly use a side input with fixed interval triggering[1] to >> query the Dataflow API to get the most recent log statement of scaling as >> suggested here[2]? >> >> [1] >> https://beam.apache.org/documentation/patterns/side-inputs/ >> [2] >> https://stackoverflow.com/a/54406878/6432284 >> >> On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen <[email protected]> >> wrote: >> >>> Hi folks, >>> >>> I've been working on a custom PTransform that makes requests to another >>> service, and would like to add a rate limiting feature there. The >>> fundamental issue that I'm running into here is that I need a decent >>> heuristic to estimate the worker count, so that each worker can >>> independently set a limit which globally comes out to the right value. All >>> of this is easy if I know how many machines I have, but I'd like to use >>> Dataflow's autoscaling, which would easily break any pre-configured value. >>> I have seen two main approaches for rate limiting, both for a >>> configurable variable x: >>> >>> - Simply assume worker count is x, then divide by x to figure out >>> the "local" limit. The issue I have here is that if we assume x is 500, >>> but >>> it is actually 50, I'm now paying for 50 nodes to throttle 10 times as >>> much >>> as necessary. I know the pipeline options have a reference to the runner, >>> is it possible to get an approximate current worker count from that at >>> bundle start (*if* runner is DataflowRunner)? >>> - Add another PTransform in front of the API requests, which groups >>> by x number of keys, throttles, and keeps forwarding elements with an >>> instant trigger. I initially really liked this solution because even if x >>> is misconfigured, I will have at most x workers running and throttle >>> appropriately. However, I noticed that for batch pipelines, this >>> effectively also caps the API request stage at x workers. If I throw in a >>> `Reshuffle`, there is another GroupByKey (-> another stage), and nothing >>> gets done until every element has passed through the throttler. >>> >>> Has anyone here tried to figure out rate limiting with Beam before, and >>> perhaps run into similar issues? I would love to know if there is a >>> preferred solution to this type of problem. >>> I know sharing state like that runs a little counter to the Beam >>> pipeline paradigm, but really all I need is an approximate worker count >>> with few guarantees. >>> >>> Cheers, >>> Daniel >>> >> -- Daniel Thevessen | Site Reliability Engineer Firestore SRE San Francisco, CA, USA | +1 (415) 373-7762
