Thanks, good to know.  Ultimately I still want to lobby for a way to
throttle bundles based on progress made further down the pipeline but I
realize that might involve major architectural changes.

Sometimes I'm forced to cancel a streaming pipeline and I'm unable to drain
it, so that can present a problem for consumers like Kafka or redis
streams.


On Fri, Jun 18, 2021 at 10:11 AM Robert Bradshaw <[email protected]>
wrote:

> For a Streaming pipeline, BagState can be unlimited in size, but for Batch
> it is held in memory.
>
> You can also use keys (Followed by a GBK or Stateful DoFn) to put an upper
> bound on the total concurrency of a step.
>
> On Thu, Jun 17, 2021 at 4:54 PM Vincent Marquez <[email protected]>
> wrote:
>
>>
>>
>> Do individual stages of a beam job exhibit backpressure to the consumer
>> though?  I would think buffering elements with Beam's BagState might lead
>> to OOM errors on the workers if the consumerIO continues to feed in data.
>> Or does something else happen?
>>
>> --Vincent
>>
>>
>> On Thu, Jun 17, 2021 at 11:42 AM Luke Cwik <[email protected]> wrote:
>>
>>> If the service returns sensible throttling errors you could use a
>>> StatefulDoFn and buffer elements that error out due to throttling from the
>>> service instead of failing the bundle and schedule a timer to replay them.
>>> This will effectively max out the service as long as there is more data
>>> then the service can handle which doesn't work too well if the service.
>>>
>>>
>>> On Fri, Apr 16, 2021 at 6:20 PM Daniel Thevessen <[email protected]>
>>> wrote:
>>>
>>>> Thanks for the quick response.
>>>> Querying the Dataflow API seems like something that could break easily,
>>>> but I can go with that if it turns out to be easier.
>>>>
>>>> The Splittable DoFn way sounds interesting, but I'm not very familiar
>>>> with that so I have some questions around it:
>>>> Splits seem to operate on offsets within a single element. Does that
>>>> mean that I'd set a fixed shard number x, and then I'd need to first group
>>>> my PCollection of single elements into a PCollection of lists, each size x?
>>>> And are the subsequent writes also limited to x workers, meaning that
>>>> splits have the same issue as with a GroupByKey?
>>>> I see the UnboundedCountingSource gets a `desiredNumSplits` parameter.
>>>> I'm assuming there is nothing similar that would allow a Splittable DoFn to
>>>> simply figure out the number of workers even if it changes? That's probably
>>>> very hacky anyway.
>>>> If the above solution with fixed-size lists makes sense and will
>>>> redistribute the writes I'm already happy, I don't necessarily need to have
>>>> the throttling step dynamically match autoscaling.
>>>>
>>>> On Thu, Apr 15, 2021 at 4:20 PM Pablo Estrada <[email protected]>
>>>> wrote:
>>>>
>>>>> You could implement a Splittable DoFn that generates a limited number
>>>>> of splits. We do something like this for
>>>>> GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It
>>>>> keeps track of its local EPS, and generates new splits if more EPSs are
>>>>> wanted. This should help you scale up to the maximum of EPS that you want,
>>>>> and autoscaling will only produce the appropriate number of workers for
>>>>> that number of splits.
>>>>>
>>>>> - The only issue may be that you can't "scale down" if you find that
>>>>> some of your splits have a very low throughput, because two splits can't 
>>>>> be
>>>>> merged back together (does that make sense?) - but Dataflow should be able
>>>>> to scale down and schedule multiple splits in a single worker if that's 
>>>>> the
>>>>> case.
>>>>>
>>>>> The UnboundedCountingSource is a Source, so it can't have an input
>>>>> (and it's deprecated), but you could write a SplittableDoFn that has the
>>>>> same behavior. Do you think this could work?
>>>>>
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348
>>>>>
>>>>> On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Could you possibly use a side input with fixed interval triggering[1]
>>>>>> to query the Dataflow API to get the most recent log statement of scaling
>>>>>> as suggested here[2]?
>>>>>>
>>>>>> [1]
>>>>>> https://beam.apache.org/documentation/patterns/side-inputs/
>>>>>> [2]
>>>>>> https://stackoverflow.com/a/54406878/6432284
>>>>>>
>>>>>> On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi folks,
>>>>>>>
>>>>>>> I've been working on a custom PTransform that makes requests to
>>>>>>> another service, and would like to add a rate limiting feature there. 
>>>>>>> The
>>>>>>> fundamental issue that I'm running into here is that I need a decent
>>>>>>> heuristic to estimate the worker count, so that each worker can
>>>>>>> independently set a limit which globally comes out to the right value. 
>>>>>>> All
>>>>>>> of this is easy if I know how many machines I have, but I'd like to use
>>>>>>> Dataflow's autoscaling, which would easily break any pre-configured 
>>>>>>> value.
>>>>>>> I have seen two main approaches for rate limiting, both for a
>>>>>>> configurable variable x:
>>>>>>>
>>>>>>>    - Simply assume worker count is x, then divide by x to figure
>>>>>>>    out the "local" limit. The issue I have here is that if we assume x 
>>>>>>> is 500,
>>>>>>>    but it is actually 50, I'm now paying for 50 nodes to throttle 10 
>>>>>>> times as
>>>>>>>    much as necessary. I know the pipeline options have a reference to 
>>>>>>> the
>>>>>>>    runner, is it possible to get an approximate current worker count 
>>>>>>> from that
>>>>>>>    at bundle start (*if* runner is DataflowRunner)?
>>>>>>>    - Add another PTransform in front of the API requests, which
>>>>>>>    groups by x number of keys, throttles, and keeps forwarding elements 
>>>>>>> with
>>>>>>>    an instant trigger. I initially really liked this solution because 
>>>>>>> even if
>>>>>>>    x is misconfigured, I will have at most x workers running and 
>>>>>>> throttle
>>>>>>>    appropriately. However, I noticed that for batch pipelines, this
>>>>>>>    effectively also caps the API request stage at x workers. If I throw 
>>>>>>> in a
>>>>>>>    `Reshuffle`, there is another GroupByKey (-> another stage), and 
>>>>>>> nothing
>>>>>>>    gets done until every element has passed through the throttler.
>>>>>>>
>>>>>>> Has anyone here tried to figure out rate limiting with Beam before,
>>>>>>> and perhaps run into similar issues? I would love to know if there is a
>>>>>>> preferred solution to this type of problem.
>>>>>>> I know sharing state like that runs a little counter to the Beam
>>>>>>> pipeline paradigm, but really all I need is an approximate worker count
>>>>>>> with few guarantees.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Daniel
>>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> Daniel Thevessen  |  Site Reliability Engineer
>>>>
>>>> Firestore SRE
>>>> San Francisco, CA, USA |  +1 (415) 373-7762 <(415)%20373-7762>
>>>>
>>>
>> ~Vincent
>>
>

Reply via email to