Hello Christian,

The challenge with generic back pressure and remote functions, is that
StateFun doesn't know if it targets a single process or a fleet of
processes behind a load balancer and an autoscaler.
Triggering back pressure too early might never kick in the autoscaling.

Indeed that parameter you have found will trigger back pressure when the
total number of requests per task slot exceeds that value. There is an
additional param that will trigger back pressure per function address.
This is called: maxNumBatchRequests
And is more fine-grained than the per-task slot parameter. Reducing this
value might be recommend if the total processing time of a single message
is potentially high (CPU intensive/ or a long IO)

I think that this use case is valid, and we need to think about the case
where the set of remote functions is static (up to a manual scale up)
I don't have a good idea at the moment as deciding to rather to back
pressure or not requires some sort of a global knowledge.

What I would recommend is, if it fits your infra, is to consider an auto
scaler for the remote functions according to a metric that makes sense to
you, and use the max-in-flight parameter as a high safety net.

Cheers,
Igal

On Thu 7. Oct 2021 at 14:03, Christian Krudewig (Corporate Development) <
christian.krude...@dpdhl.com> wrote:

> Hello fellow Flink users,
>
> How do you create backpressure with Statefun remote functions? I'm using an
> asynchronous web server for the remote function (Python aiohttp on uvicorn)
> which accepts more requests than its CPU bound backend can handle. That can
> make the requests time out and can trigger a restart loop of the whole
> Flink
> pipeline. Of course only in situations where so many requests are coming
> into the ingress kafka stream that the service cannot handle it anymore.
>
> Desired behavior: Flink only consumes as many records from the input stream
> as the pipeline can handle instead of overloading the remote functions.
>
> What I tried so far:
> 1. Set "statefun.async.max-per-task" in flink.conf to a low number. This
> works. But that is one global static config for all function which cannot
> be
> changed without restarting the cluster when the remote functions are scaled
> up or down.
> 2. Add concurrency limiting to the remote function service. If the function
> service returns failure codes (500, 503, 429) that doesn't seem to create
> backpressure but is handled like a normal failure by flink with retries
> until finally the whole pipeline gets restarted.
> 3. Try the new "io.statefun.transports.v1/async" transport type for the
> endpoints with a low "pool_size" parameter. But reaching the pool size
> seems
> to be treated like an error instead of creating backpressure. Same effect
> as
> option 2.
>
> Is there some other option? How should it be done by design?
>
> Thanks,
>
> Christian
>
>
>

Reply via email to