Re: Metrics or runtimeContext in global commit

2023-02-20 Thread Tobias Fröhlich
Dear Yuxia,

thank you for your answer! This is also our conclusion and my colleague has 
already proposed this feature.

Best regards,
Tobias


- Ursprüngliche Mail -
Von: "yuxia" 
An: "Dr. Tobias Fröhlich" 
CC: "User" , "dev" 
Gesendet: Montag, 20. Februar 2023 03:31:22
Betreff: Re: Metrics or runtimeContext in global commit

It seems no other way to get the runtimeContext in a global commit. For me, I 
think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Tobias Fröhlich" 
收件人: "User" 
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the 
method 
org.apache.flink.api.connector.sink2.Committer::commit(Collection>
 committables) that I use for global commit. I use the helper method 
StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit 
topology.

The problem is: When I implement the interface Committer, I cannot get the 
runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it 
in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext" that 
implements Committer and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and 
thus has a runtimeContext) adding the following lines at the end:

if (committer instanceof  CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext) 
committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext) committer).init();
}

I can then implement the method CommitterWithRuntimeContext::init() in our code 
and call the method CommitterWithRuntimeContext::getRuntimeContext() when I 
need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is 
it justified to propose a feature request for a future release, where the 
global commit method can be implemented in a way that the user has access to 
the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich


Re: Watermark in global commit

2023-02-15 Thread Tobias Fröhlich
Dear Jan,

thank you for your answer!
The logic that ensures consistency should already be implemented in the 
TwoPhaseCommitSink and the WithPostCommitTopology. So I would rather like to 
use these well-tested classes than implement my own logic for this.

Best regards
Tobias



Hi,

I'm not expert on Flink specifially, but your approach might be easier 
solve when broken down into two steps - create a "stable" input to 
downstream processing, this might include a specific watermark. In 
Flink, the "stability" of input for downstream processing is ensured by 
a checkpoint. You would therefore need to wait for a checkpoint, 
buffering intermediate data in a state (and produce a particular 
watermark as a data element, because watermarks in general need not be 
'stable'). Once a checkpoint is completed, you would flush the buffer 
for downstream operators, one would create the parquet files, the other 
would do whatever action needs to be taken based on the watermark. The 
checkpoint ensures that the two tasks would be eventually consistent (if 
this is sufficient for your case).

In Apache Beam, we call this operation a transform that 
'@RequiresStableInput' [1], the implementation in Flink is as I 
described above.

  Jan

[1] 
https://beam.apache.org/releases/javadoc/2.44.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html

On 2/14/23 13:23, Tobias Fröhlich wrote:
> Dear flink team,
>
> I am facing the following problem: I would need to write events to parquet 
> files using the FileSink. Subsequently, I want to do something else in a 
> global commit where I need the corresponding watermark. However, the 
> org.apache.flink.connector.file.sink.FileSink forces the type of the 
> committables to be org.apache.flink.connector.file.sink.FileSinkCommittable 
> which can not carry watermarks.
>
> Details:
>
> As far, as I understand the idea of a two-phase commit with a global 
> committer, the committables are used for passing information from the writer 
> to the global committer. This is done by implementing two methods in the 
> writer and the committer, respectively:
>
>   1. Collection 
> TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns 
> a collection of committables of some type CommT and
>   2. void Committer::commit(Collection>) that uses this 
> collection.
>
> In general, the type CommT can be chosen arbitrarily. So, if the watermark is 
> needed in the global commit, it is possible to use a customized object that 
> contains a field for the watermark. However, if the class 
> org.apache.flink.connector.file.sink.FileSink is used, the type for the 
> committables is always 
> org.apache.flink.connector.file.sink.FileSinkCommittable which does not have 
> a field that can be used for the watermark.
>
> The only solution I found, was by forking the flink source code and 
> augmenting it in the following way:
>
>1. adding a field to FileSinkCommittable ("private long watermark;" with 
> getter and setter)
>2. changing the FileSinkCommittableSerializer accordingly (this makes it 
> necessary to define a new version)
>3. in fileWriter::prepareCommit() adding a loop over all committables to 
> set the watermark
>
>
> Am I missing something? Is there an easier way to get the watermarks from the 
> writer to the global committer? If not, is it justified to propose a feature 
> request?
>
> Best regards and thanks in advance
> Tobias Fröhlich
>


Metrics or runtimeContext in global commit

2023-02-14 Thread Tobias Fröhlich
Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the 
method 
org.apache.flink.api.connector.sink2.Committer::commit(Collection>
 committables) that I use for global commit. I use the helper method 
StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit 
topology.

The problem is: When I implement the interface Committer, I cannot get the 
runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it 
in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext" that 
implements Committer and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and 
thus has a runtimeContext) adding the following lines at the end:

if (committer instanceof  CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext) 
committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext) committer).init();
}

I can then implement the method CommitterWithRuntimeContext::init() in our code 
and call the method CommitterWithRuntimeContext::getRuntimeContext() when I 
need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is 
it justified to propose a feature request for a future release, where the 
global commit method can be implemented in a way that the user has access to 
the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich


Watermark in global commit

2023-02-14 Thread Tobias Fröhlich
Dear flink team,

I am facing the following problem: I would need to write events to parquet 
files using the FileSink. Subsequently, I want to do something else in a global 
commit where I need the corresponding watermark. However, the 
org.apache.flink.connector.file.sink.FileSink forces the type of the 
committables to be org.apache.flink.connector.file.sink.FileSinkCommittable 
which can not carry watermarks.

Details:

As far, as I understand the idea of a two-phase commit with a global committer, 
the committables are used for passing information from the writer to the global 
committer. This is done by implementing two methods in the writer and the 
committer, respectively:

 1. Collection 
TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns a 
collection of committables of some type CommT and 
 2. void Committer::commit(Collection>) that uses this 
collection.

In general, the type CommT can be chosen arbitrarily. So, if the watermark is 
needed in the global commit, it is possible to use a customized object that 
contains a field for the watermark. However, if the class 
org.apache.flink.connector.file.sink.FileSink is used, the type for the 
committables is always org.apache.flink.connector.file.sink.FileSinkCommittable 
which does not have a field that can be used for the watermark.

The only solution I found, was by forking the flink source code and augmenting 
it in the following way:

  1. adding a field to FileSinkCommittable ("private long watermark;" with 
getter and setter)
  2. changing the FileSinkCommittableSerializer accordingly (this makes it 
necessary to define a new version)
  3. in fileWriter::prepareCommit() adding a loop over all committables to set 
the watermark


Am I missing something? Is there an easier way to get the watermarks from the 
writer to the global committer? If not, is it justified to propose a feature 
request?

Best regards and thanks in advance
Tobias Fröhlich