Hi Rinat, First of all, sorry for some nitpicking in the beginning, but your message might be a bit misleading for some. If I understood your message correctly you are referring to Metrics, not accumulators, which are a different concept[1]. Or were you indeed referring to accumulators?
Now on to the topic of accessing metrics. Personally I don't think it is
a right way for testing by exposing metrics somehow in the
ProcessFunctionTestHarness. The harness should primarily be used as a
minimal execution environment for testing operators and such behaviours
as e.g. checkpointing. I would not recommend using it for testing
business logic and most definitely metrics. I'd either test that in an
IT test using a MiniCluster and a metric reporter you can assert or I'd
separate the business logic from the setup logic. Something like:
| private static class MyProcessFunction<IN, OUT> extends
ProcessFunction<IN, OUT> {||
||
|| private MyLogic<IN, OUT> logic;||
||
|| @Override||
|| public void open(Configuration parameters) throws Exception {||
|| super.open(parameters);||
|| this.logic = new
MyLogic<>(getRuntimeContext().getMetricGroup().counter("my-counter"));||
|| }||
||
|| @Override||
|| public void processElement(IN value, Context ctx,
Collector<OUT> out) throws Exception {||
|| out.collect(logic.doMyBusinessLogic(value));||
|| }||
|| }||
||
|| private static class MyLogic<IN, OUT> {||
|| ||
|| private final Counter counter;||
|| ||
|| public MyLogic(Counter counter) {||
|| this.counter = counter;||
|| }||
||
|| public OUT doMyBusinessLogic(IN value) {||
|| // do the processing||
|| }||
|
| }|
That way you can easily test your MyLogic class including interactions
with the counter, by passing a mock counter.
Best,
Dawid
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#accumulators--counters
On 27/10/2020 08:02, Sharipov, Rinat wrote:
> Hi mates !
>
> I guess that I'm doing something wrong, but I couldn't find a way to
> access registered accumulators and their values
> via /*org.apache.flink.streaming.util.*/*ProcessFunctionTestHarness
> *function wrapper that I'm using to test my functions.
>
> During the code research I've found, that required data is stored
> in */org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#/*metrics
> field, that is private and is not accessible from tests. It's obvious
> that Flink somehow accesses this field and exposes counters into it's
> Web UI.
>
> So I guess that someone can help me to add a check into my Unit Tests
> for metrics counting or in case if there is no such ability I'm ready
> to help to implement it if the community considers this acceptable.
>
> Thx !
>
>
>
>
>
>
>
signature.asc
Description: OpenPGP digital signature
