Dear Yuxia,

thank you for your answer! This is also our conclusion and my colleague has 
already proposed this feature.

Best regards,
Tobias


----- Ursprüngliche Mail -----
Von: "yuxia" <luoyu...@alumni.sjtu.edu.cn>
An: "Dr. Tobias Fröhlich" <tobias.froehl...@scoop-software.de>
CC: "User" <user@flink.apache.org>, "dev" <d...@flink.apache.org>
Gesendet: Montag, 20. Februar 2023 03:31:22
Betreff: Re: Metrics or runtimeContext in global commit

It seems no other way to get the runtimeContext in a global commit. For me, I 
think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Tobias Fröhlich" <tobias.froehl...@scoop-software.de>
收件人: "User" <user@flink.apache.org>
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the 
method 
org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>>
 committables) that I use for global commit. I use the helper method 
StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit 
topology.

The problem is: When I implement the interface Committer, I cannot get the 
runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it 
in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that 
implements Committer<CommT> and has:
    - an additional field for the runtimeContext
    - setter and getter for this field
    - an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and 
thus has a runtimeContext) adding the following lines at the end:

    if (committer instanceof  CommitterWithRuntimeContext) {
        ((CommitterWithRuntimeContext<CommT>) 
committer).setRuntimeContext(getRuntimeContext());
        ((CommitterWithRuntimeContext<CommT>) committer).init();
    }

I can then implement the method CommitterWithRuntimeContext::init() in our code 
and call the method CommitterWithRuntimeContext::getRuntimeContext() when I 
need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is 
it justified to propose a feature request for a future release, where the 
global commit method can be implemented in a way that the user has access to 
the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich

Reply via email to