Hi,

I'm trying to understand the exactly once semantics of the
StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it
guarantees exactly once under a very specific failure scenario.

For simplicity, lets say we will roll the current part file on checkpoint
(and only on checkpoint), the process is as follows:
1. Framework tells the sink to prepare for a checkpoint. This ultimately
results in 'onReceptionOfCheckpoint' being called on Bucket.java.
2. This takes the current file, and based on our roll policy of rolling on
checkpoint, it closes and uploads it to S3 as part of a MPU and the
reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
3. Once the checkpoint successfully completes, the bucket is notified via
'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes
through all pendingPartsPerCheckpoint and for each of them: recovers the in
progress part (which doesn't exist in this scenario) and then commits the
upload.
4. The AmazonS3Client is ultimately called to perform the upload and it
retries the attempt up to N times. If it exhausts retries, it will throw an
Exception.
5. Upon successful commit of the MPU, Bucket clears out its references to
these uploads from its state.

Given this flow, I'm having trouble understanding how the following
scenario works:

   - Step 4: The commit on the MPU succeeds,
   - Step 5: Before this step completes, the task crashes. So at this point, S3
   has successfully completed the MPU but to the client (the Flink job), it
   has not completed.
   - Flink will then recover from the checkpoint we just took and steps 3
   and 4 will be repeated. My understanding is that, since the MPU succeeded
   previously, any attempts at re-committing that upload will result in a 404
   ('NoSuchUpload'). So Step 4 should throw an exception. Which would then get
   retried by the framework and this process repeats itself.

So how is this case handled?

Really appreciate the help!
-Kaustubh

Reply via email to