Re: Watermark in global commit

2023-02-15 Thread Tobias Fröhlich
Dear Jan,

thank you for your answer!
The logic that ensures consistency should already be implemented in the 
TwoPhaseCommitSink and the WithPostCommitTopology. So I would rather like to 
use these well-tested classes than implement my own logic for this.

Best regards
Tobias



Hi,

I'm not expert on Flink specifially, but your approach might be easier 
solve when broken down into two steps - create a "stable" input to 
downstream processing, this might include a specific watermark. In 
Flink, the "stability" of input for downstream processing is ensured by 
a checkpoint. You would therefore need to wait for a checkpoint, 
buffering intermediate data in a state (and produce a particular 
watermark as a data element, because watermarks in general need not be 
'stable'). Once a checkpoint is completed, you would flush the buffer 
for downstream operators, one would create the parquet files, the other 
would do whatever action needs to be taken based on the watermark. The 
checkpoint ensures that the two tasks would be eventually consistent (if 
this is sufficient for your case).

In Apache Beam, we call this operation a transform that 
'@RequiresStableInput' [1], the implementation in Flink is as I 
described above.

  Jan

[1] 
https://beam.apache.org/releases/javadoc/2.44.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html

On 2/14/23 13:23, Tobias Fröhlich wrote:
> Dear flink team,
>
> I am facing the following problem: I would need to write events to parquet 
> files using the FileSink. Subsequently, I want to do something else in a 
> global commit where I need the corresponding watermark. However, the 
> org.apache.flink.connector.file.sink.FileSink forces the type of the 
> committables to be org.apache.flink.connector.file.sink.FileSinkCommittable 
> which can not carry watermarks.
>
> Details:
>
> As far, as I understand the idea of a two-phase commit with a global 
> committer, the committables are used for passing information from the writer 
> to the global committer. This is done by implementing two methods in the 
> writer and the committer, respectively:
>
>   1. Collection 
> TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns 
> a collection of committables of some type CommT and
>   2. void Committer::commit(Collection>) that uses this 
> collection.
>
> In general, the type CommT can be chosen arbitrarily. So, if the watermark is 
> needed in the global commit, it is possible to use a customized object that 
> contains a field for the watermark. However, if the class 
> org.apache.flink.connector.file.sink.FileSink is used, the type for the 
> committables is always 
> org.apache.flink.connector.file.sink.FileSinkCommittable which does not have 
> a field that can be used for the watermark.
>
> The only solution I found, was by forking the flink source code and 
> augmenting it in the following way:
>
>1. adding a field to FileSinkCommittable ("private long watermark;" with 
> getter and setter)
>2. changing the FileSinkCommittableSerializer accordingly (this makes it 
> necessary to define a new version)
>3. in fileWriter::prepareCommit() adding a loop over all committables to 
> set the watermark
>
>
> Am I missing something? Is there an easier way to get the watermarks from the 
> writer to the global committer? If not, is it justified to propose a feature 
> request?
>
> Best regards and thanks in advance
> Tobias Fröhlich
>


Re: Watermark in global commit

2023-02-14 Thread Jan Lukavský

Hi,

I'm not expert on Flink specifially, but your approach might be easier 
solve when broken down into two steps - create a "stable" input to 
downstream processing, this might include a specific watermark. In 
Flink, the "stability" of input for downstream processing is ensured by 
a checkpoint. You would therefore need to wait for a checkpoint, 
buffering intermediate data in a state (and produce a particular 
watermark as a data element, because watermarks in general need not be 
'stable'). Once a checkpoint is completed, you would flush the buffer 
for downstream operators, one would create the parquet files, the other 
would do whatever action needs to be taken based on the watermark. The 
checkpoint ensures that the two tasks would be eventually consistent (if 
this is sufficient for your case).


In Apache Beam, we call this operation a transform that 
'@RequiresStableInput' [1], the implementation in Flink is as I 
described above.


 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.44.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html


On 2/14/23 13:23, Tobias Fröhlich wrote:

Dear flink team,

I am facing the following problem: I would need to write events to parquet 
files using the FileSink. Subsequently, I want to do something else in a global 
commit where I need the corresponding watermark. However, the 
org.apache.flink.connector.file.sink.FileSink forces the type of the 
committables to be org.apache.flink.connector.file.sink.FileSinkCommittable 
which can not carry watermarks.

Details:

As far, as I understand the idea of a two-phase commit with a global committer, 
the committables are used for passing information from the writer to the global 
committer. This is done by implementing two methods in the writer and the 
committer, respectively:

  1. Collection 
TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns a 
collection of committables of some type CommT and
  2. void Committer::commit(Collection>) that uses this 
collection.

In general, the type CommT can be chosen arbitrarily. So, if the watermark is needed 
in the global commit, it is possible to use a customized object that contains a field 
for the watermark. However, if the class 
org.apache.flink.connector.file.sink.FileSink is used, the type for the 
committables is always org.apache.flink.connector.file.sink.FileSinkCommittable which 
does not have a field that can be used for the watermark.

The only solution I found, was by forking the flink source code and augmenting 
it in the following way:

   1. adding a field to FileSinkCommittable ("private long watermark;" with 
getter and setter)
   2. changing the FileSinkCommittableSerializer accordingly (this makes it 
necessary to define a new version)
   3. in fileWriter::prepareCommit() adding a loop over all committables to set 
the watermark


Am I missing something? Is there an easier way to get the watermarks from the 
writer to the global committer? If not, is it justified to propose a feature 
request?

Best regards and thanks in advance
Tobias Fröhlich



Watermark in global commit

2023-02-14 Thread Tobias Fröhlich
Dear flink team,

I am facing the following problem: I would need to write events to parquet 
files using the FileSink. Subsequently, I want to do something else in a global 
commit where I need the corresponding watermark. However, the 
org.apache.flink.connector.file.sink.FileSink forces the type of the 
committables to be org.apache.flink.connector.file.sink.FileSinkCommittable 
which can not carry watermarks.

Details:

As far, as I understand the idea of a two-phase commit with a global committer, 
the committables are used for passing information from the writer to the global 
committer. This is done by implementing two methods in the writer and the 
committer, respectively:

 1. Collection 
TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns a 
collection of committables of some type CommT and 
 2. void Committer::commit(Collection>) that uses this 
collection.

In general, the type CommT can be chosen arbitrarily. So, if the watermark is 
needed in the global commit, it is possible to use a customized object that 
contains a field for the watermark. However, if the class 
org.apache.flink.connector.file.sink.FileSink is used, the type for the 
committables is always org.apache.flink.connector.file.sink.FileSinkCommittable 
which does not have a field that can be used for the watermark.

The only solution I found, was by forking the flink source code and augmenting 
it in the following way:

  1. adding a field to FileSinkCommittable ("private long watermark;" with 
getter and setter)
  2. changing the FileSinkCommittableSerializer accordingly (this makes it 
necessary to define a new version)
  3. in fileWriter::prepareCommit() adding a loop over all committables to set 
the watermark


Am I missing something? Is there an easier way to get the watermarks from the 
writer to the global committer? If not, is it justified to propose a feature 
request?

Best regards and thanks in advance
Tobias Fröhlich