Hi, I have some questions about the new StreamingFileSink in 1.6.
My usecase is pretty simple. I have a cassandra table with 150Millions of lines. They are partitioned by buckets of 100 000 lines. My job is to export each "bucket" to a file (1 bucket = 1 file), so the job is degined like this: The source get the bucketList then a flatmap task, fetch the lines matching the bucket and map all the 100 000 lines from cassandra to the collector then a streamingFileSink write each line into a file by bucket (RowFormat). The checkpointing is enabled, each 10s The rollingPolicy is OnCheckpointRollingPolicy, and the bucketAssigner is implemented by bucketId (my bucketId not the sink's one :). My problem is at the end of the job, i only have in-progress.part files for each bucket. I do not understand how i can trigger the finalization of the sink and have the bucket part files committed. So I read the code of the StreamingFileSink and the Bucket classes. If i have well understood, the in-progress bucket files can be closed then committed (closePartFile method of the Bucket) and move to "pending state" following the rollingPolicy to wait for a checkpoint to be moved to "finished" state. So the rollingPolicy can roll part files on each line, on each BucketInterval or on each checkpoint. In my case with the OnCheckpointRollingPolicy, it is only on each checkpoint. Am I right ? And when a checkpoint is successful all the pending file are moved to "finished" state and are exploitable by another jobs. then this is where I start losing myself. indeed one thing suprised me in the code of the close method of the StreamingFileSink. It discards <https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java#L293> all active buckets since the last successful checkpoint! but at the end of the successful job, no checkpoint is triggered automatically if the minimal interval since the last checkpoint is not expired. so what happen to data written since the last checkpoint ? (-> Is this sink only for endless Stream ?) How do i do to get all my file with all my data in "finished" state when my job is finished with success ? Do I need to trigger a checkpoint manually? Is there a better fitting sink for my usecase ? Should i use a another rollingPolicy ? even with the bucket interval there still is a window between the interval and the end of the job during which some part files are not closed and committed. Even in case of an endless stream, I suggest to improve the behavior of the close method by calling closePartFile on each active bucket so all valid data since last checkpoint can be committed to pending state waiting for the checkpoint a the end of the job. it seems to be the case of the BucketingSink <https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L437> -> I can do a PR for this I'm open to all suggestions :) regards, -- <http://www.octo.com/> Benoit *MERIAUX* *OCTO Technology* Consultant Confirmé - Tribu NAD ................................................. 34, Avenue de l'Opéra 75002 Paris +33 (0)786 59 12 30 <%2F%2F%2B33786591230> www.octo.com - blog.octo.com * www.usievents.com <http://www.usievents.com/> * @USIEvents <https://twitter.com/USIEvents> - @OCTOTechnology <https://twitter.com/OCTOTechnology> .................................................