[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977103#comment-15977103 ] Stephan Ewen commented on FLINK-6315: - Please ping us anytime you have questions on this one... > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976699#comment-15976699 ] ASF GitHub Bot commented on FLINK-6315: --- Github user sjwiesman closed the pull request at: https://github.com/apache/flink/pull/3729 > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976688#comment-15976688 ] Seth Wiesman commented on FLINK-6315: - Thank you for talking this through with me, I'm going to close this issue and the PR. I think I can use some of this to make the eventually consistent sink work with concurrent checkpoints; we'll see if it works otherwise I will document the sink as only being consistent with one concurrent checkpoint. > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976368#comment-15976368 ] Stephan Ewen commented on FLINK-6315: - For the incremental checkpointing, it works with more than one concurrent checkpoint. You simply have some overlap. By the time checkpoint *n* starts, it may be that checkpoint *n-1* is not complete, but there will surely be some other complete checkpoint. It will base its diff on that one then. If no checkpoint ever finishes before the next starts, you will have each diff checkpointed multiple times. So it still works, but it is kind of an antipattern, because it does redundant work. We could make the same assumption for the eventually consistent bucketing sink. FWIW: I think users rarely run with more than one concurrent checkpoint. > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975434#comment-15975434 ] Seth Wiesman commented on FLINK-6315: - [~StephanEwen] So I guess this is the crux of the real issue, eventually consistent file systems cannot perform renames but only PUTS. If one checkpoint times out the next needs to re-PUT its files into the next valid bucket. I had considered something similar to what you are doing with incremental checkpointing, if checkpoint 3 arrives before 2 finishes than build on checkpoint 1. The reason I backed away from is that in the degenerate case where no checkpoint ever completes before the next begins no buckets would be committed and no data would ever be deleted from the buffer. Is this a case you deal with for incremental checkpointing or is checkpointing that frequently considered an anti-pattern and not dealt with? Of course the easy solution would be to say that the `EventualyConsistentSink` only provides exactly once when `maxConcurrentCheckpoints` is set to 1 but that doesn't feel like a satisfactory answer to me. > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975318#comment-15975318 ] Aljoscha Krettek commented on FLINK-6315: - In the {{BucketingSink}}, we move "pending" files to "final" when we get a checkpoint complete notification. When restoring (from a failure or from a savepoint) we also move pending files that were confirmed by a checkpoint but have not been moved because we had a failure before receiving the completion notification to "final". TL:DR Either the notify or a restore does the move from "pending" to "final". > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975203#comment-15975203 ] Stephan Ewen commented on FLINK-6315: - I think you are thinking about it the right way. When checkpoint 2 does not happen for whatever reason then checkpoint 3 should be in charge of everything since the last successful checkpoint. I see the problem now: When checkpoint 3 starts, you may not yet know whether checkpoint 2 is actually going to complete. To make it more tricky, it may actually be that checkpoint 2 fails (due to a timeout) after checkpoint 3 completes. In the incremental checkpointing code, we have a similar problem. In that case, we can only re-reference a diff if it is part of a completed checkpoint. If for example checkpoint 2 is not complete when checkpoint 3 is started, then checkpoint 3 builds on checkpoint 1, not on checkpoint 2. [~aljoscha] How is that handled in the regular bucketing sink? > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974984#comment-15974984 ] Seth Wiesman commented on FLINK-6315: - [~StephanEwen] Regarding the first two points, that makes a lot of sense, thank you for clarifying. Perhaps there are some more subtitles to checkpointing than I am currently aware. As I currently understand it, a checkpoint may either fail due to a timeout or a task failure. In the case of a task failure, that will lead to the job restarting from the last valid checkpoint. If that happens than all data that was processed after that checkpoint will be reprocessed and everything should "just work". The specific case that I am concerned about is when one checkpoint is timed out by the checkpoint coordinator but then the next one succeeds. Assume that checkpoint 1 is successful, checkpoint 2 times out, and then checkpoint 3 is successful. Then in this case the data processed between checkpoints 1 and 2 is never reprocessed. Because I cannot assume that any particular instance of an operator completed checkpoint 2, I am looking for a way to push their responsibilities into checkpoint 3. Does this make sense, or am I do I have some misunderstanding of checkpoint failures? > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974912#comment-15974912 ] Stephan Ewen commented on FLINK-6315: - [~sjwiesman] Adding a few bits of information here about the behavior of checkpoints: - The {{notifyCheckpointComplete}} messages are sent out always, but if there is a failure between the point where the master "commits" the checkpoint and before messages arrive, then all TaskManagers may cancel their tasks and no task will receive that message. - The same may hold for any {{notifyCheckpointTimeout}} message. - It may be that some tasks complete their checkpoint, and others fail before completing theirs. In that case the checkpoint is neither complete, not timed out, simply failed. I am wondering if a timeout should be handled differently to any other failure? > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974811#comment-15974811 ] Seth Wiesman commented on FLINK-6315: - [~aljoscha] Apologies, didn't see this comment. Might have jumped the gun a little bit sending in this PR In an eventually consistent file system, PUT operations are the only ones that can be considered valid. That means that invalid data can never be deleted. So instead valid data is marked valid when it has been fully checkpointed. Each sink will buffer data locally. Then on checkpoint that data will be copied to the final FS (say S3) in a bucket that whose path is a combination of the checkpoint id and timestamp of when the checkpoint was initiated so that it is always unique. When the checkpoint is completed a single flag file is written to the bucket, marking it valid. In practice, every operator can attempt to write this file to the bucket; even though overwriting a file is considered an inconsistent operation, all operators are writing identical empty files so it’s not actually an issue. This means that I am only reliant on a single operator ever receiving the checkpoint complete message. I was not aware of the possibility that no operator would receive the message so I may need to rethink this bit. As far as notifyOnCheckpointTimeout goes, each operator keeps track of which files it has uploaded for each checkpoint. If checkpoint A times out then those files must be re-uploaded on checkpoint B, there is a diagram on FLINK-6306 showing this process. If the number of concurrent checkpoints is set to 1 then this is trivial, if a new checkpoint begins before the last completed it must have timed out. This becomes more difficult when concurrent checkpoints are thrown into the mix, I need a way to signal that a previous checkpoint has timed out and it is up to the next to upload those files. > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974756#comment-15974756 ] Aljoscha Krettek commented on FLINK-6315: - [~StephanEwen] do you have any comments on this? [~sjwiesman] As a side comment, I think that committing in "notify" can be a bit dangerous because that call only happens on a best-effort basis. It can happen that Flink considers a checkpoint to be complete but the operators might never receive a notify call. What would happen with your special sink in this case: we do a checkpoint, consider that checkpoint complete, fail to call notify, then fail the job and then restart a job from that checkpoint. > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974757#comment-15974757 ] ASF GitHub Bot commented on FLINK-6315: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3729 I commented on the issue because we might need a general discussion about the feature first. > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974694#comment-15974694 ] ASF GitHub Bot commented on FLINK-6315: --- Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3729 @aljoscha Could you review this or point me in the direction of the correct person? I need this to land for the EventualConsistencySink > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971404#comment-15971404 ] ASF GitHub Bot commented on FLINK-6315: --- GitHub user sjwiesman opened a pull request: https://github.com/apache/flink/pull/3729 [FLINK-6315] Notify on checkpoint timeout https://issues.apache.org/jira/browse/FLINK-6315 A common use case when writing a custom operator that outputs data to some third party location to partially output on checkpoint and then commit on notifyCheckpointComplete. If that external system does not gracefully handle rollbacks (such as Amazon S3 not allowing consistent delete operations) then that data needs to be handled by the next checkpoint. The idea is to add a new interface similar to CheckpointListener that provides a callback when the CheckpointCoordinator times out a checkpoint This is required for the eventually consistent sink coming in FLINK-6306 to be able to differentiate between concurrent checkpoints and timed out checkpoints. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sjwiesman/flink FLINK-6315 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3729.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3729 commit 323851929772b4c57e65b7146e96af6687d3a2eb Author: Seth WiesmanDate: 2017-04-15T21:13:20Z FLINK-6315 Notify on checkpoint timeout https://issues.apache.org/jira/browse/FLINK-6315 > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)