[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-20 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977103#comment-15977103
 ] 

Stephan Ewen commented on FLINK-6315:
-

Please ping us anytime you have questions on this one...

> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976699#comment-15976699
 ] 

ASF GitHub Bot commented on FLINK-6315:
---

Github user sjwiesman closed the pull request at:

https://github.com/apache/flink/pull/3729


> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-20 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976688#comment-15976688
 ] 

Seth Wiesman commented on FLINK-6315:
-

Thank you for talking this through with me, I'm going to close this issue and 
the PR. I think I can use some of this to make the eventually consistent sink 
work with concurrent checkpoints; we'll see if it works otherwise I will 
document the sink as only being consistent with one concurrent checkpoint. 

> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-20 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976368#comment-15976368
 ] 

Stephan Ewen commented on FLINK-6315:
-

For the incremental checkpointing, it works with more than one concurrent 
checkpoint. You simply have some overlap.

By the time checkpoint *n* starts, it may be that checkpoint *n-1* is not 
complete, but there will surely be some other complete checkpoint. It will base 
its diff on that one then.
If no checkpoint ever finishes before the next starts, you will have each diff 
checkpointed multiple times. So it still works, but it is kind of an 
antipattern, because it does redundant work.

We could make the same assumption for the eventually consistent bucketing sink.

FWIW: I think users rarely run with more than one concurrent checkpoint.

> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975434#comment-15975434
 ] 

Seth Wiesman commented on FLINK-6315:
-

[~StephanEwen] 

So I guess this is the crux of the real issue, eventually consistent file 
systems cannot perform renames but only PUTS. If one checkpoint times out the 
next needs to re-PUT its files into the next valid bucket. 

I had considered something similar to what you are doing with incremental 
checkpointing, if checkpoint 3 arrives before 2 finishes than build on 
checkpoint 1. The reason I backed away from is that in the degenerate case 
where no checkpoint ever completes before the next begins no buckets would be 
committed and no data would ever be deleted from the buffer. Is this a case you 
deal with for incremental checkpointing or is checkpointing that frequently 
considered an anti-pattern and not dealt with? 

Of course the easy solution would be to say that the `EventualyConsistentSink` 
only provides exactly once when `maxConcurrentCheckpoints` is set to 1 but that 
doesn't feel like a satisfactory answer to me. 


> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975318#comment-15975318
 ] 

Aljoscha Krettek commented on FLINK-6315:
-

In the {{BucketingSink}}, we move "pending" files to "final" when we get a 
checkpoint complete notification. When restoring (from a failure or from a 
savepoint) we also move pending files that were confirmed by a checkpoint but 
have not been moved because we had a failure before receiving the completion 
notification to "final".

TL:DR
Either the notify or a restore does the move from "pending" to "final".

> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975203#comment-15975203
 ] 

Stephan Ewen commented on FLINK-6315:
-

I think you are thinking about it the right way. When checkpoint 2 does not 
happen for whatever reason then checkpoint 3 should be in charge of everything 
since the last successful checkpoint.

I see the problem now: When checkpoint 3 starts, you may not yet know whether 
checkpoint 2 is actually going to complete. To make it more tricky, it may 
actually be that checkpoint 2 fails (due to a timeout) after checkpoint 3 
completes.

In the incremental checkpointing code, we have a similar problem. In that case, 
we can only re-reference a diff if it is part of a completed checkpoint. If for 
example checkpoint 2 is not complete when checkpoint 3 is started, then 
checkpoint 3 builds on checkpoint 1, not on checkpoint 2.

[~aljoscha] How is that handled in the regular bucketing sink?

> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974984#comment-15974984
 ] 

Seth Wiesman commented on FLINK-6315:
-

[~StephanEwen] 

Regarding the first two points, that makes a lot of sense, thank you for 
clarifying. Perhaps there are some more subtitles to checkpointing than I am 
currently aware. As I currently understand it, a checkpoint may either fail due 
to a timeout or a task failure. In the case of a task failure, that will lead 
to the job restarting from the last valid checkpoint. If that happens than all 
data that was processed after that checkpoint will be reprocessed and 
everything should "just work". 

The specific case that I am concerned about is when one checkpoint is timed out 
by the checkpoint coordinator but then the next one succeeds. Assume that 
checkpoint 1 is successful, checkpoint 2 times out, and then checkpoint 3 is 
successful. Then in this case the data processed between checkpoints 1 and 2 is 
never reprocessed. Because I cannot assume that any particular instance of an 
operator completed checkpoint 2, I am looking for a way to push their 
responsibilities into checkpoint 3. 

Does this make sense, or am I do I have some misunderstanding of checkpoint 
failures? 

> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974912#comment-15974912
 ] 

Stephan Ewen commented on FLINK-6315:
-

[~sjwiesman] Adding a few bits of information here about the behavior of 
checkpoints:

  - The {{notifyCheckpointComplete}} messages are sent out always, but if there 
is a failure between the point where the master "commits" the checkpoint and 
before messages arrive, then all TaskManagers may cancel their tasks and no 
task will receive that message.
  - The same may hold for any {{notifyCheckpointTimeout}} message.
  - It may be that some tasks complete their checkpoint, and others fail before 
completing theirs. In that case the checkpoint is neither complete, not timed 
out, simply failed.

I am wondering if a timeout should be handled differently to any other failure?


> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974811#comment-15974811
 ] 

Seth Wiesman commented on FLINK-6315:
-

[~aljoscha] Apologies, didn't see this comment. Might have jumped the gun a 
little bit sending in this PR 

In an eventually consistent file system, PUT operations are the only ones that 
can be considered valid. That means that invalid data can never be deleted. So 
instead valid data is marked valid when it has been fully checkpointed.

Each sink will buffer data locally. Then on checkpoint that data will be copied 
to the final FS (say S3) in a bucket that whose path is a combination of the 
checkpoint id and timestamp of when the checkpoint was initiated so that it is 
always unique. When the checkpoint is completed a single flag file is written 
to the bucket, marking it valid. 

In practice, every operator can attempt to write this file to the bucket; even 
though overwriting a file is considered an inconsistent operation, all 
operators are writing identical empty files so it’s not actually an issue. This 
means that I am only reliant on a single operator ever receiving the checkpoint 
complete message. I was not aware of the possibility that no operator would 
receive the message so I may need to rethink this bit. 

As far as notifyOnCheckpointTimeout goes, each operator keeps track of which 
files it has uploaded for each checkpoint. If checkpoint A times out then those 
files must be re-uploaded on checkpoint B, there is a diagram on FLINK-6306 
showing this process. If the number of concurrent checkpoints is set to 1 then 
this is trivial, if a new checkpoint begins before the last completed it must 
have timed out. This becomes more difficult when concurrent checkpoints are 
thrown into the mix, I need a way to signal that a previous checkpoint has 
timed out and it is up to the next to upload those files.


> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974756#comment-15974756
 ] 

Aljoscha Krettek commented on FLINK-6315:
-

[~StephanEwen] do you have any comments on this?

[~sjwiesman] As a side comment, I think that committing in "notify" can be a 
bit dangerous because that call only happens on a best-effort basis. It can 
happen that Flink considers a checkpoint to be complete but the operators might 
never receive a notify call. What would happen with your special sink in this 
case: we do a checkpoint, consider that checkpoint complete, fail to call 
notify, then fail the job and then restart a job from that checkpoint.

> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974757#comment-15974757
 ] 

ASF GitHub Bot commented on FLINK-6315:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3729
  
I commented on the issue because we might need a general discussion about 
the feature first.


> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974694#comment-15974694
 ] 

ASF GitHub Bot commented on FLINK-6315:
---

Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/3729
  
@aljoscha Could you review this or point me in the direction of the correct 
person? I need this to land for the EventualConsistencySink


> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971404#comment-15971404
 ] 

ASF GitHub Bot commented on FLINK-6315:
---

GitHub user sjwiesman opened a pull request:

https://github.com/apache/flink/pull/3729

[FLINK-6315] Notify on checkpoint timeout

https://issues.apache.org/jira/browse/FLINK-6315

A common use case when writing a custom operator that outputs data to some 
third party location to partially output on checkpoint and then commit on 
notifyCheckpointComplete. If that external system does not gracefully handle 
rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
that data needs to be handled by the next checkpoint.
The idea is to add a new interface similar to CheckpointListener that 
provides a callback when the CheckpointCoordinator times out a checkpoint

This is required for the eventually consistent sink coming in FLINK-6306 to 
be able to differentiate between concurrent checkpoints and timed out 
checkpoints. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sjwiesman/flink FLINK-6315

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3729


commit 323851929772b4c57e65b7146e96af6687d3a2eb
Author: Seth Wiesman 
Date:   2017-04-15T21:13:20Z

FLINK-6315 Notify on checkpoint timeout

https://issues.apache.org/jira/browse/FLINK-6315




> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)