Hi, I'm trying to understand the exactly once semantics of the StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it guarantees exactly once under a very specific failure scenario.
For simplicity, lets say we will roll the current part file on checkpoint (and only on checkpoint), the process is as follows: 1. Framework tells the sink to prepare for a checkpoint. This ultimately results in 'onReceptionOfCheckpoint' being called on Bucket.java. 2. This takes the current file, and based on our roll policy of rolling on checkpoint, it closes and uploads it to S3 as part of a MPU and the reference to this upload is stored as part of 'pendingPartsPerCheckpoint'. 3. Once the checkpoint successfully completes, the bucket is notified via 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes through all pendingPartsPerCheckpoint and for each of them: recovers the in progress part (which doesn't exist in this scenario) and then commits the upload. 4. The AmazonS3Client is ultimately called to perform the upload and it retries the attempt up to N times. If it exhausts retries, it will throw an Exception. 5. Upon successful commit of the MPU, Bucket clears out its references to these uploads from its state. Given this flow, I'm having trouble understanding how the following scenario works: - Step 4: The commit on the MPU succeeds, - Step 5: Before this step completes, the task crashes. So at this point, S3 has successfully completed the MPU but to the client (the Flink job), it has not completed. - Flink will then recover from the checkpoint we just took and steps 3 and 4 will be repeated. My understanding is that, since the MPU succeeded previously, any attempts at re-committing that upload will result in a 404 ('NoSuchUpload'). So Step 4 should throw an exception. Which would then get retried by the framework and this process repeats itself. So how is this case handled? Really appreciate the help! -Kaustubh