Thanks, good to know. Ultimately I still want to lobby for a way to throttle bundles based on progress made further down the pipeline but I realize that might involve major architectural changes.
Sometimes I'm forced to cancel a streaming pipeline and I'm unable to drain it, so that can present a problem for consumers like Kafka or redis streams. On Fri, Jun 18, 2021 at 10:11 AM Robert Bradshaw <[email protected]> wrote: > For a Streaming pipeline, BagState can be unlimited in size, but for Batch > it is held in memory. > > You can also use keys (Followed by a GBK or Stateful DoFn) to put an upper > bound on the total concurrency of a step. > > On Thu, Jun 17, 2021 at 4:54 PM Vincent Marquez <[email protected]> > wrote: > >> >> >> Do individual stages of a beam job exhibit backpressure to the consumer >> though? I would think buffering elements with Beam's BagState might lead >> to OOM errors on the workers if the consumerIO continues to feed in data. >> Or does something else happen? >> >> --Vincent >> >> >> On Thu, Jun 17, 2021 at 11:42 AM Luke Cwik <[email protected]> wrote: >> >>> If the service returns sensible throttling errors you could use a >>> StatefulDoFn and buffer elements that error out due to throttling from the >>> service instead of failing the bundle and schedule a timer to replay them. >>> This will effectively max out the service as long as there is more data >>> then the service can handle which doesn't work too well if the service. >>> >>> >>> On Fri, Apr 16, 2021 at 6:20 PM Daniel Thevessen <[email protected]> >>> wrote: >>> >>>> Thanks for the quick response. >>>> Querying the Dataflow API seems like something that could break easily, >>>> but I can go with that if it turns out to be easier. >>>> >>>> The Splittable DoFn way sounds interesting, but I'm not very familiar >>>> with that so I have some questions around it: >>>> Splits seem to operate on offsets within a single element. Does that >>>> mean that I'd set a fixed shard number x, and then I'd need to first group >>>> my PCollection of single elements into a PCollection of lists, each size x? >>>> And are the subsequent writes also limited to x workers, meaning that >>>> splits have the same issue as with a GroupByKey? >>>> I see the UnboundedCountingSource gets a `desiredNumSplits` parameter. >>>> I'm assuming there is nothing similar that would allow a Splittable DoFn to >>>> simply figure out the number of workers even if it changes? That's probably >>>> very hacky anyway. >>>> If the above solution with fixed-size lists makes sense and will >>>> redistribute the writes I'm already happy, I don't necessarily need to have >>>> the throttling step dynamically match autoscaling. >>>> >>>> On Thu, Apr 15, 2021 at 4:20 PM Pablo Estrada <[email protected]> >>>> wrote: >>>> >>>>> You could implement a Splittable DoFn that generates a limited number >>>>> of splits. We do something like this for >>>>> GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It >>>>> keeps track of its local EPS, and generates new splits if more EPSs are >>>>> wanted. This should help you scale up to the maximum of EPS that you want, >>>>> and autoscaling will only produce the appropriate number of workers for >>>>> that number of splits. >>>>> >>>>> - The only issue may be that you can't "scale down" if you find that >>>>> some of your splits have a very low throughput, because two splits can't >>>>> be >>>>> merged back together (does that make sense?) - but Dataflow should be able >>>>> to scale down and schedule multiple splits in a single worker if that's >>>>> the >>>>> case. >>>>> >>>>> The UnboundedCountingSource is a Source, so it can't have an input >>>>> (and it's deprecated), but you could write a SplittableDoFn that has the >>>>> same behavior. Do you think this could work? >>>>> >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348 >>>>> >>>>> On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin <[email protected]> >>>>> wrote: >>>>> >>>>>> Could you possibly use a side input with fixed interval triggering[1] >>>>>> to query the Dataflow API to get the most recent log statement of scaling >>>>>> as suggested here[2]? >>>>>> >>>>>> [1] >>>>>> https://beam.apache.org/documentation/patterns/side-inputs/ >>>>>> [2] >>>>>> https://stackoverflow.com/a/54406878/6432284 >>>>>> >>>>>> On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi folks, >>>>>>> >>>>>>> I've been working on a custom PTransform that makes requests to >>>>>>> another service, and would like to add a rate limiting feature there. >>>>>>> The >>>>>>> fundamental issue that I'm running into here is that I need a decent >>>>>>> heuristic to estimate the worker count, so that each worker can >>>>>>> independently set a limit which globally comes out to the right value. >>>>>>> All >>>>>>> of this is easy if I know how many machines I have, but I'd like to use >>>>>>> Dataflow's autoscaling, which would easily break any pre-configured >>>>>>> value. >>>>>>> I have seen two main approaches for rate limiting, both for a >>>>>>> configurable variable x: >>>>>>> >>>>>>> - Simply assume worker count is x, then divide by x to figure >>>>>>> out the "local" limit. The issue I have here is that if we assume x >>>>>>> is 500, >>>>>>> but it is actually 50, I'm now paying for 50 nodes to throttle 10 >>>>>>> times as >>>>>>> much as necessary. I know the pipeline options have a reference to >>>>>>> the >>>>>>> runner, is it possible to get an approximate current worker count >>>>>>> from that >>>>>>> at bundle start (*if* runner is DataflowRunner)? >>>>>>> - Add another PTransform in front of the API requests, which >>>>>>> groups by x number of keys, throttles, and keeps forwarding elements >>>>>>> with >>>>>>> an instant trigger. I initially really liked this solution because >>>>>>> even if >>>>>>> x is misconfigured, I will have at most x workers running and >>>>>>> throttle >>>>>>> appropriately. However, I noticed that for batch pipelines, this >>>>>>> effectively also caps the API request stage at x workers. If I throw >>>>>>> in a >>>>>>> `Reshuffle`, there is another GroupByKey (-> another stage), and >>>>>>> nothing >>>>>>> gets done until every element has passed through the throttler. >>>>>>> >>>>>>> Has anyone here tried to figure out rate limiting with Beam before, >>>>>>> and perhaps run into similar issues? I would love to know if there is a >>>>>>> preferred solution to this type of problem. >>>>>>> I know sharing state like that runs a little counter to the Beam >>>>>>> pipeline paradigm, but really all I need is an approximate worker count >>>>>>> with few guarantees. >>>>>>> >>>>>>> Cheers, >>>>>>> Daniel >>>>>>> >>>>>> >>>> >>>> -- >>>> >>>> Daniel Thevessen | Site Reliability Engineer >>>> >>>> Firestore SRE >>>> San Francisco, CA, USA | +1 (415) 373-7762 <(415)%20373-7762> >>>> >>> >> ~Vincent >> >
