[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974984#comment-15974984
 ] 

Seth Wiesman edited comment on FLINK-6315 at 4/19/17 4:20 PM:
--------------------------------------------------------------

[~StephanEwen] 

Regarding the first two points, that makes a lot of sense, thank you for 
clarifying. Perhaps there are some more subtitles to checkpointing than I am 
currently aware. As I currently understand it, a checkpoint may either fail due 
to a timeout or a task failure. In the case of a task failure, that will lead 
to the job restarting from the last valid checkpoint. If that happens, even if 
some tasks complete their checkpoint, and others fail before completing theirs, 
the way that buckets are maintained using both checkpoint id and timestamp 
everything will "just work". 

The specific case that I am concerned about is when one checkpoint is timed out 
by the checkpoint coordinator but then the next one succeeds. Assume that 
checkpoint 1 is successful, checkpoint 2 times out, and then checkpoint 3 is 
successful. Then in this case the data processed between checkpoints 1 and 2 is 
never reprocessed. Because I cannot assume that any particular instance of an 
operator completed checkpoint 2, I am looking for a way to push their 
responsibilities into checkpoint 3. 

Does this make sense, or am I do I have some misunderstanding of checkpoint 
failures? 


was (Author: sjwiesman):
[~StephanEwen] 

Regarding the first two points, that makes a lot of sense, thank you for 
clarifying. Perhaps there are some more subtitles to checkpointing than I am 
currently aware. As I currently understand it, a checkpoint may either fail due 
to a timeout or a task failure. In the case of a task failure, that will lead 
to the job restarting from the last valid checkpoint. If that happens than all 
data that was processed after that checkpoint will be reprocessed and 
everything should "just work". 

The specific case that I am concerned about is when one checkpoint is timed out 
by the checkpoint coordinator but then the next one succeeds. Assume that 
checkpoint 1 is successful, checkpoint 2 times out, and then checkpoint 3 is 
successful. Then in this case the data processed between checkpoints 1 and 2 is 
never reprocessed. Because I cannot assume that any particular instance of an 
operator completed checkpoint 2, I am looking for a way to push their 
responsibilities into checkpoint 3. 

Does this make sense, or am I do I have some misunderstanding of checkpoint 
failures? 

> Notify on checkpoint timeout 
> -----------------------------
>
>                 Key: FLINK-6315
>                 URL: https://issues.apache.org/jira/browse/FLINK-6315
>             Project: Flink
>          Issue Type: New Feature
>          Components: Core
>            Reporter: Seth Wiesman
>            Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>       /**
>        * This method is called as a notification if a distributed checkpoint 
> has been timed out.
>        *
>        * @param checkpointId The ID of the checkpoint that has been timed out.
>        * @throws Exception
>        */
>       void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to