Re: Metrics or runtimeContext in global commit

2023-02-20 Thread Tobias Fröhlich
Dear Yuxia,

thank you for your answer! This is also our conclusion and my colleague has 
already proposed this feature.

Best regards,
Tobias


- Ursprüngliche Mail -
Von: "yuxia" 
An: "Dr. Tobias Fröhlich" 
CC: "User" , "dev" 
Gesendet: Montag, 20. Februar 2023 03:31:22
Betreff: Re: Metrics or runtimeContext in global commit

It seems no other way to get the runtimeContext in a global commit. For me, I 
think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Tobias Fröhlich" 
收件人: "User" 
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the 
method 
org.apache.flink.api.connector.sink2.Committer::commit(Collection>
 committables) that I use for global commit. I use the helper method 
StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit 
topology.

The problem is: When I implement the interface Committer, I cannot get the 
runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it 
in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext" that 
implements Committer and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and 
thus has a runtimeContext) adding the following lines at the end:

if (committer instanceof  CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext) 
committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext) committer).init();
}

I can then implement the method CommitterWithRuntimeContext::init() in our code 
and call the method CommitterWithRuntimeContext::getRuntimeContext() when I 
need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is 
it justified to propose a feature request for a future release, where the 
global commit method can be implemented in a way that the user has access to 
the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich


Re: Metrics or runtimeContext in global commit

2023-02-19 Thread yuxia
It seems no other way to get the runtimeContext in a global commit. For me, I 
think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Tobias Fröhlich" 
收件人: "User" 
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the 
method 
org.apache.flink.api.connector.sink2.Committer::commit(Collection>
 committables) that I use for global commit. I use the helper method 
StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit 
topology.

The problem is: When I implement the interface Committer, I cannot get the 
runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it 
in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext" that 
implements Committer and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and 
thus has a runtimeContext) adding the following lines at the end:

if (committer instanceof  CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext) 
committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext) committer).init();
}

I can then implement the method CommitterWithRuntimeContext::init() in our code 
and call the method CommitterWithRuntimeContext::getRuntimeContext() when I 
need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is 
it justified to propose a feature request for a future release, where the 
global commit method can be implemented in a way that the user has access to 
the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich


Metrics or runtimeContext in global commit

2023-02-14 Thread Tobias Fröhlich
Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the 
method 
org.apache.flink.api.connector.sink2.Committer::commit(Collection>
 committables) that I use for global commit. I use the helper method 
StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit 
topology.

The problem is: When I implement the interface Committer, I cannot get the 
runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it 
in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext" that 
implements Committer and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and 
thus has a runtimeContext) adding the following lines at the end:

if (committer instanceof  CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext) 
committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext) committer).init();
}

I can then implement the method CommitterWithRuntimeContext::init() in our code 
and call the method CommitterWithRuntimeContext::getRuntimeContext() when I 
need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is 
it justified to propose a feature request for a future release, where the 
global commit method can be implemented in a way that the user has access to 
the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich