If the service returns sensible throttling errors you could use a StatefulDoFn and buffer elements that error out due to throttling from the service instead of failing the bundle and schedule a timer to replay them. This will effectively max out the service as long as there is more data then the service can handle which doesn't work too well if the service.
On Fri, Apr 16, 2021 at 6:20 PM Daniel Thevessen <[email protected]> wrote: > Thanks for the quick response. > Querying the Dataflow API seems like something that could break easily, > but I can go with that if it turns out to be easier. > > The Splittable DoFn way sounds interesting, but I'm not very familiar with > that so I have some questions around it: > Splits seem to operate on offsets within a single element. Does that mean > that I'd set a fixed shard number x, and then I'd need to first group my > PCollection of single elements into a PCollection of lists, each size x? > And are the subsequent writes also limited to x workers, meaning that > splits have the same issue as with a GroupByKey? > I see the UnboundedCountingSource gets a `desiredNumSplits` parameter. I'm > assuming there is nothing similar that would allow a Splittable DoFn to > simply figure out the number of workers even if it changes? That's probably > very hacky anyway. > If the above solution with fixed-size lists makes sense and will > redistribute the writes I'm already happy, I don't necessarily need to have > the throttling step dynamically match autoscaling. > > On Thu, Apr 15, 2021 at 4:20 PM Pablo Estrada <[email protected]> wrote: > >> You could implement a Splittable DoFn that generates a limited number of >> splits. We do something like this for >> GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It >> keeps track of its local EPS, and generates new splits if more EPSs are >> wanted. This should help you scale up to the maximum of EPS that you want, >> and autoscaling will only produce the appropriate number of workers for >> that number of splits. >> >> - The only issue may be that you can't "scale down" if you find that some >> of your splits have a very low throughput, because two splits can't be >> merged back together (does that make sense?) - but Dataflow should be able >> to scale down and schedule multiple splits in a single worker if that's the >> case. >> >> The UnboundedCountingSource is a Source, so it can't have an input (and >> it's deprecated), but you could write a SplittableDoFn that has the same >> behavior. Do you think this could work? >> >> >> [1] >> https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348 >> >> On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin <[email protected]> >> wrote: >> >>> Could you possibly use a side input with fixed interval triggering[1] to >>> query the Dataflow API to get the most recent log statement of scaling as >>> suggested here[2]? >>> >>> [1] >>> https://beam.apache.org/documentation/patterns/side-inputs/ >>> [2] >>> https://stackoverflow.com/a/54406878/6432284 >>> >>> On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen <[email protected]> >>> wrote: >>> >>>> Hi folks, >>>> >>>> I've been working on a custom PTransform that makes requests to another >>>> service, and would like to add a rate limiting feature there. The >>>> fundamental issue that I'm running into here is that I need a decent >>>> heuristic to estimate the worker count, so that each worker can >>>> independently set a limit which globally comes out to the right value. All >>>> of this is easy if I know how many machines I have, but I'd like to use >>>> Dataflow's autoscaling, which would easily break any pre-configured value. >>>> I have seen two main approaches for rate limiting, both for a >>>> configurable variable x: >>>> >>>> - Simply assume worker count is x, then divide by x to figure out >>>> the "local" limit. The issue I have here is that if we assume x is 500, >>>> but >>>> it is actually 50, I'm now paying for 50 nodes to throttle 10 times as >>>> much >>>> as necessary. I know the pipeline options have a reference to the >>>> runner, >>>> is it possible to get an approximate current worker count from that at >>>> bundle start (*if* runner is DataflowRunner)? >>>> - Add another PTransform in front of the API requests, which groups >>>> by x number of keys, throttles, and keeps forwarding elements with an >>>> instant trigger. I initially really liked this solution because even if >>>> x >>>> is misconfigured, I will have at most x workers running and throttle >>>> appropriately. However, I noticed that for batch pipelines, this >>>> effectively also caps the API request stage at x workers. If I throw in >>>> a >>>> `Reshuffle`, there is another GroupByKey (-> another stage), and nothing >>>> gets done until every element has passed through the throttler. >>>> >>>> Has anyone here tried to figure out rate limiting with Beam before, and >>>> perhaps run into similar issues? I would love to know if there is a >>>> preferred solution to this type of problem. >>>> I know sharing state like that runs a little counter to the Beam >>>> pipeline paradigm, but really all I need is an approximate worker count >>>> with few guarantees. >>>> >>>> Cheers, >>>> Daniel >>>> >>> > > -- > > Daniel Thevessen | Site Reliability Engineer > > Firestore SRE > San Francisco, CA, USA | +1 (415) 373-7762 <(415)%20373-7762> >
