Hi,

I have some questions about the new StreamingFileSink in 1.6.

My usecase is pretty simple.
I have a cassandra table with 150Millions of lines.
They are partitioned by buckets of 100 000 lines.

My job is to export each "bucket" to a file (1 bucket = 1 file), so the job
is degined like this:

The source get the bucketList
then a flatmap task, fetch the lines matching the bucket and map all the
100 000 lines from cassandra to the collector
then a streamingFileSink write each line into a file by bucket (RowFormat).

The checkpointing is enabled, each 10s
The rollingPolicy is OnCheckpointRollingPolicy, and the bucketAssigner is
implemented by bucketId (my bucketId not the sink's one :).

My problem is at the end of the job, i only have in-progress.part files for
each bucket.

I do not understand how i can trigger the finalization of the sink and have
the bucket part files committed.

So I read the code of the StreamingFileSink and the Bucket classes.

If i have well understood, the in-progress bucket files can be closed then
committed (closePartFile method of the Bucket) and move to "pending state"
following the rollingPolicy to wait for a checkpoint to be moved to
"finished" state.

So the rollingPolicy can roll part files on each line, on each
BucketInterval or on each checkpoint.
In my case with the OnCheckpointRollingPolicy, it is only on each
checkpoint. Am I right ?

And when a checkpoint is successful all the pending file are moved to
"finished" state and are exploitable by another jobs.

then this is where I start losing myself.

indeed one thing suprised me in the code of the close method of the
StreamingFileSink.
It discards
<https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java#L293>
all
active buckets since the last successful checkpoint!
but at the end of the successful job, no checkpoint is triggered
automatically if the minimal interval since the last checkpoint is not
expired. so what happen to data written since the last checkpoint ?
(-> Is this sink only for endless Stream ?)

How do i do to get all my file with all my data in "finished" state when my
job is finished with success ?
Do I need to trigger a checkpoint manually?
Is there a better fitting sink for my usecase ?
Should i use a another rollingPolicy ? even with the bucket interval there
still is a window between the interval and the end of the job during which
some part files are not closed and committed.

Even in case of an endless stream, I suggest to improve the behavior of the
close method by calling closePartFile on each active bucket so all valid
data since last checkpoint can be committed to pending state waiting for
the checkpoint a the end of the job.
it seems to be the case of the BucketingSink
<https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L437>
-> I can do a PR for this


I'm open to all suggestions :)

regards,

-- 
<http://www.octo.com/>
Benoit  *MERIAUX*
*OCTO Technology*
Consultant Confirmé - Tribu NAD
.................................................
34, Avenue de l'Opéra
75002 Paris
+33 (0)786 59 12 30 <%2F%2F%2B33786591230>
www.octo.com  - blog.octo.com
* www.usievents.com <http://www.usievents.com/> *
@USIEvents <https://twitter.com/USIEvents>  - @OCTOTechnology
<https://twitter.com/OCTOTechnology>
.................................................

Reply via email to