Hi,

I'm not expert on Flink specifially, but your approach might be easier solve when broken down into two steps - create a "stable" input to downstream processing, this might include a specific watermark. In Flink, the "stability" of input for downstream processing is ensured by a checkpoint. You would therefore need to wait for a checkpoint, buffering intermediate data in a state (and produce a particular watermark as a data element, because watermarks in general need not be 'stable'). Once a checkpoint is completed, you would flush the buffer for downstream operators, one would create the parquet files, the other would do whatever action needs to be taken based on the watermark. The checkpoint ensures that the two tasks would be eventually consistent (if this is sufficient for your case).

In Apache Beam, we call this operation a transform that '@RequiresStableInput' [1], the implementation in Flink is as I described above.

 Jan

[1] https://beam.apache.org/releases/javadoc/2.44.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html

On 2/14/23 13:23, Tobias Fröhlich wrote:
Dear flink team,

I am facing the following problem: I would need to write events to parquet 
files using the FileSink. Subsequently, I want to do something else in a global 
commit where I need the corresponding watermark. However, the 
org.apache.flink.connector.file.sink.FileSink forces the type of the 
committables to be org.apache.flink.connector.file.sink.FileSinkCommittable 
which can not carry watermarks.

Details:

As far, as I understand the idea of a two-phase commit with a global committer, 
the committables are used for passing information from the writer to the global 
committer. This is done by implementing two methods in the writer and the 
committer, respectively:

  1. Collection<CommT> 
TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns a 
collection of committables of some type CommT and
  2. void Committer::commit(Collection<CommitRequest<CommT>>) that uses this 
collection.

In general, the type CommT can be chosen arbitrarily. So, if the watermark is needed 
in the global commit, it is possible to use a customized object that contains a field 
for the watermark. However, if the class 
org.apache.flink.connector.file.sink.FileSink<IN> is used, the type for the 
committables is always org.apache.flink.connector.file.sink.FileSinkCommittable which 
does not have a field that can be used for the watermark.

The only solution I found, was by forking the flink source code and augmenting 
it in the following way:

   1. adding a field to FileSinkCommittable ("private long watermark;" with 
getter and setter)
   2. changing the FileSinkCommittableSerializer accordingly (this makes it 
necessary to define a new version)
   3. in fileWriter::prepareCommit() adding a loop over all committables to set 
the watermark


Am I missing something? Is there an easier way to get the watermarks from the 
writer to the global committer? If not, is it justified to propose a feature 
request?

Best regards and thanks in advance
Tobias Fröhlich

Reply via email to