Hi Salva,

I'm not sure, but I think you can not access the state (especially the
keyed state) from within the metric, as metrics are being evaluated outside
of the keyed context, and also from another thread. Also things like
`ValueState`/`MapState` are not exposing any size.

So probably you would have to follow Kezhu's suggestion. Whenever you are
updating your state value, you can also update a shared variable to track
the combined size (`AtomicLong`?). Upon recovery you would need to
reinitialize it (maybe indeed `KeyedStateBackend.applyToAllKeys`).

Piotrek



śr., 17 lut 2021 o 14:13 Kezhu Wang <kez...@gmail.com> napisał(a):

> With an initial `y`, I think you could compute new `y` on new stream
> value. Upon recovering from checkpoint, may be
> `KeyedStateBackend.applyToAllKeys` could help you to rebuild an initial `y`.
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 13:09:39, Salva Alcántara (salcantara...@gmail.com)
> wrote:
>
> I wonder what is the canonical way to accomplish the following:
>
> Given a Flink UDF, how to report a metric `y` which is a function of some
> (keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
> the size of the state X.
>
> For instance, consider a `CoFlatMap` function with:
>
> - `X` being a `MapState`
> - `y` (the metric) consisting of the aggregated size (i.e., the total size
> of the `MapState`, for all keys)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

Reply via email to