[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574609#comment-15574609
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2608


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571422#comment-15571422
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2608
  
Local Travis passed except for an independent Cassandra test failure 
(https://travis-ci.org/uce/flink/builds/167107635). Going to merge this.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569028#comment-15569028
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2608
  
+1 to the general approach and the code

Some suggestions for name polishing:
  - How about renaming `DISCARD_ON_CANCELLATION` to 
`DELETE_ON_CANCELLATION`? That would sound more explicit like "cleanup" and 
actual file deletion.
  - Since all checkpoints are persistent (at least in HA), how about 
calling this `enableExternalizedCheckpoints()` rather than 
`enablePersistentCheckpoints()`?
  - I would suggest to drop the method `enablePersistentCheckpoints()` 
without a cleanup policy parameter. Whoever enables that feature should 
explicitly think about what cleanup policy they want.

For the future, can we get rid of the extra storage location for the 
externalized checkpoint metadata? Simply store it as well in the checkpoint 
directory? That makes it simpler for users to track and clean up checkpoints 
manually, if they want to retain externalized checkpoints across cancellations 
and terminal failures.
  - Both the config value and the location parameter to the 
`enablePersistentCheckpoints()` would be dropped.
  - That would imply that every state backend needs to be able to provide a 
storage location for the checkpoint metadata
  - The memory state backend would hence not work with externalized 
checkpoints, unless one sets explicitly a parameter 
`setExternalizedCheckpointsLocation(uri)`.`

Since this is a bigger change, I would suggest a followup pull request for 
that. The only change I would make to this pull request (to make transition to 
the followup smoother) is to remove the path parameter from the 
`enablePersistentCheckpoints()` methods and always use the configuration value 
(which will be replaced by the state backend's storage location).


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568366#comment-15568366
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2608
  
Addressed all comments except Exception narrowing which I would like to 
leave open as follow ups as they were not introduced with this PR. Thanks 
again! I will wait for Travis and then merge this if there are no objections.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568348#comment-15568348
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82978898
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ---
@@ -256,29 +306,51 @@ public boolean acknowledgeTask(
 * Aborts a checkpoint because it expired (took too long).
 */
public void abortExpired() throws Exception {
--- End diff --

I would like to do this as a follow up


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568326#comment-15568326
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82977296
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
@@ -43,24 +125,62 @@
 * @return The loaded savepoint
 * @throws Exception Failures during load are forwared
 */
-   Savepoint loadSavepoint(String path) throws Exception;
+   public static Savepoint loadSavepoint(String path) throws IOException {
+   Preconditions.checkNotNull(path, "Path");
 
-   /**
-* Disposes the savepoint at the specified path.
-*
-* @param pathPath of savepoint to dispose
-* @throws Exception Failures during diposal are forwarded
-*/
-   void disposeSavepoint(String path) throws Exception;
+   try (DataInputStream dis = new 
DataInputViewStreamWrapper(createFsInputStream(new Path(path {
+   int magicNumber = dis.readInt();
+
+   if (magicNumber == MAGIC_NUMBER) {
+   int version = dis.readInt();
+
+   SavepointSerializer serializer = 
SavepointSerializers.getSerializer(version);
+   return serializer.deserialize(dis);
+   } else {
+   throw new RuntimeException("Unexpected magic 
number. This is most likely " +
+   "caused by trying to load a 
Flink 1.0 savepoint. You cannot load a " +
+   "savepoint triggered by Flink 
1.0 with this version of Flink. If it is " +
+   "_not_ a Flink 1.0 savepoint, 
this error indicates that the specified " +
+   "file is not a proper savepoint 
or the file has been corrupted.");
+   }
+   }
+   }
 
/**
-* Shut downs the savepoint store.
+* Removes the savepoint meta data w/o loading and disposing it.
 *
-* Only necessary for implementations where the savepoint life-cycle 
is
-* bound to the cluster life-cycle.
-*
-* @throws Exception Failures during shut down are forwarded
+* @param path Path of savepoint to remove
+* @throws Exception Failures during disposal are forwarded
 */
-   void shutdown() throws Exception;
+   public static void removeSavepoint(String path) throws Exception {
--- End diff --

Changed


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568325#comment-15568325
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82977266
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
@@ -18,23 +18,105 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Savepoint store used to persist {@link Savepoint} instances.
+ * A file system based savepoint store.
  *
- * The main implementation is the {@link FsSavepointStore}. We also 
have the
- * {@link HeapSavepointStore} for historical reasons (introduced in Flink 
1.0).
+ * Stored savepoints have the following format:
+ * 
+ * MagicNumber SavepointVersion Savepoint
+ *   - MagicNumber => int
+ *   - SavepointVersion => int (returned by Savepoint#getVersion())
+ *   - Savepoint => bytes (serialized via version-specific 
SavepointSerializer)
+ * 
  */
-public interface SavepointStore {
+public class SavepointStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SavepointStore.class);
+
+   /** Magic number for sanity checks against stored savepoints. */
+   private static final int MAGIC_NUMBER = 0x4960672d;
+
+   /** Prefix for savepoint files. */
+   private static final String prefix = "savepoint-";
 
/**
 * Stores the savepoint.
 *
+* @param targetDirectory Target directory to store savepoint in
 * @param savepoint Savepoint to be stored
 * @paramSavepoint type
 * @return Path of stored savepoint
 * @throws Exception Failures during store are forwarded
 */
-String storeSavepoint(T savepoint) throws 
Exception;
+   public static  String storeSavepoint(
+   String targetDirectory,
+   T savepoint) throws IOException {
+
+   checkNotNull(targetDirectory, "Target directory");
+   checkNotNull(savepoint, "Savepoint");
+
+   Exception latestException = null;
+   Path path = null;
+   FSDataOutputStream fdos = null;
+
+   FileSystem fs = null;
+
+   // Try to create a FS output stream
+   for (int attempt = 0; attempt < 10; attempt++) {
+   path = new Path(targetDirectory, 
FileUtils.getRandomFilename(prefix));
+
+   if (fs == null) {
+   fs = FileSystem.get(path.toUri());
+   }
+
+   try {
+   fdos = fs.create(path, false);
+   break;
+   } catch (Exception e) {
+   latestException = e;
+   }
+   }
+
+   if (fdos == null) {
+   throw new IOException("Failed to create file output 
stream at " + path, latestException);
+   }
+
+   boolean success = false;
+   try (DataOutputStream dos = new DataOutputStream(fdos)) {
+   // Write header
+   dos.writeInt(MAGIC_NUMBER);
+   dos.writeInt(savepoint.getVersion());
+
+   // Write savepoint
+   SavepointSerializer serializer = 
SavepointSerializers.getSerializer(savepoint);
+   serializer.serialize(savepoint, dos);
+   success = true;
+   } finally {
+   if (!success && fs.exists(path)) {
+   if (!fs.delete(path, true)) {
+   LOG.warn("Failed to delete file " + 
path + " after failed write.");
--- End diff --

Replaced here and below


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  

[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568319#comment-15568319
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976869
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 ---
@@ -48,13 +48,12 @@
public static CompletedCheckpoint loadAndValidateSavepoint(
JobID jobId,
Map tasks,
-   SavepointStore savepointStore,
String savepointPath) throws Exception {
 
// (1) load the savepoint
-   Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
+   Savepoint savepoint = 
SavepointStore.loadSavepoint(savepointPath);
final Map taskStates = new 
HashMap<>(savepoint.getTaskStates().size());
-   
+
// (2) validate it (parallelism, etc)
for (TaskState taskState : savepoint.getTaskStates()) {
ExecutionJobVertex executionJobVertex = 
tasks.get(taskState.getJobVertexID());
--- End diff --

Yes, updated.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568316#comment-15568316
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976634
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -233,68 +229,90 @@ public int getNumberOfRetainedCheckpoints() {
}
 
@Override
-   public void shutdown() throws Exception {
-   LOG.info("Shutting down");
+   public void shutdown(JobStatus jobStatus) throws Exception {
--- End diff --

Our only option would be to wrap in our own Exception, because Curator is 
throwing the general `Exception`.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568313#comment-15568313
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976383
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -172,7 +168,7 @@ public void recover() throws Exception {
 
for (int i = 0; i < numberOfInitialCheckpoints - 1; 
i++) {
try {
-   
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+   
removeSubsumed(initialCheckpoints.get(i));
--- End diff --

Yes. Even more, I think this is generally dangerous. What if a checkpoint 
is recovered, but the checkpoint cannot be restored, than we will have lost all 
others. Since we currently only keep a single one anyways, it is not a problem 
yet.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568246#comment-15568246
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82972407
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 ---
@@ -64,9 +62,9 @@ public void recover() throws Exception {
 
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
-   checkpoints.addLast(checkpoint);
+   checkpoints.add(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-   checkpoints.removeFirst().discardState();
+   checkpoints.remove().subsume();
--- End diff --

Manually triggered savepoints for example are not discarded when they are 
subsumed. The CheckpointProperties constructor is package private (for testing) 
and only the static creator methods (for persistent checkpoints, regular 
checkpoints, and manually triggered savepoints) are publicly accessible. Let me 
add a check to the properties that only allow manual discard if the checkpoint 
is persisted.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568232#comment-15568232
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82971700
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -18,44 +18,243 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.io.Serializable;
+
 /**
  * The configuration of a checkpoint, such as whether
  * 
- * The checkpoint is a savepoint
+ * The checkpoint should be persisted
  * The checkpoint must be full, or may be incremental
  * The checkpoint format must be the common (cross backend) 
format, or may be state-backend specific
--- End diff --

Added a note `(not yet implemented` after those.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568226#comment-15568226
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82971361
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -282,29 +279,71 @@ public boolean isShutdown() {
//  Handling checkpoints and messages
// 

 
-   public Future triggerSavepoint(long timestamp) throws Exception 
{
-   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
+   /**
+* Triggers a savepoint with the default savepoint directory as a 
target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no default savepoint directory has 
been configured
+* @throws Exception Failures during triggering are forwarded
+*/
+   public Future triggerSavepoint(long timestamp) 
throws Exception {
+   return triggerSavepoint(timestamp, null);
+   }
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @param savepointDirectory Target directory for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+* @throws Exception Failures during triggering are 
forwarded
+*/
+   public Future triggerSavepoint(long timestamp, 
String savepointDirectory) throws Exception {
+   String targetDirectory;
+   if (savepointDirectory != null) {
+   targetDirectory = savepointDirectory;
+   } else if (this.savepointDirectory != null) {
+   targetDirectory = this.savepointDirectory;
+   } else {
+   throw new IllegalStateException("No savepoint directory 
configured. " +
+   "You can either specify a directory 
when triggering this savepoint or " +
+   "configure a cluster-wide default via 
key '" +
+   ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ "'.");
+   }
+
+   CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
 
if (result.isSuccess()) {
-   PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
-   return savepoint.getCompletionFuture();
-   }
-   else {
-   return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
+   return 
result.getPendingCheckpoint().getCompletionFuture();
+   } else {
+   CompletableFuture failed = new 
FlinkCompletableFuture<>();
+   failed.completeExceptionally(new Exception("Failed to 
trigger savepoint: " + result.getFailureReason().message()));
--- End diff --

`CheckpointDeclineReason` is not an `Exception`, but an enum of decline 
reasons.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> 

[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568224#comment-15568224
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82971228
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -282,29 +279,71 @@ public boolean isShutdown() {
//  Handling checkpoints and messages
// 

 
-   public Future triggerSavepoint(long timestamp) throws Exception 
{
-   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
+   /**
+* Triggers a savepoint with the default savepoint directory as a 
target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no default savepoint directory has 
been configured
+* @throws Exception Failures during triggering are forwarded
+*/
+   public Future triggerSavepoint(long timestamp) 
throws Exception {
+   return triggerSavepoint(timestamp, null);
+   }
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @param savepointDirectory Target directory for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+* @throws Exception Failures during triggering are 
forwarded
+*/
+   public Future triggerSavepoint(long timestamp, 
String savepointDirectory) throws Exception {
+   String targetDirectory;
+   if (savepointDirectory != null) {
+   targetDirectory = savepointDirectory;
+   } else if (this.savepointDirectory != null) {
+   targetDirectory = this.savepointDirectory;
+   } else {
+   throw new IllegalStateException("No savepoint directory 
configured. " +
+   "You can either specify a directory 
when triggering this savepoint or " +
+   "configure a cluster-wide default via 
key '" +
+   ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ "'.");
+   }
+
+   CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
 
if (result.isSuccess()) {
-   PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
-   return savepoint.getCompletionFuture();
-   }
-   else {
-   return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
+   return 
result.getPendingCheckpoint().getCompletionFuture();
+   } else {
+   CompletableFuture failed = new 
FlinkCompletableFuture<>();
--- End diff --

I was looking for that one, but didn't find it. Thanks! 


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568219#comment-15568219
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82970716
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -219,33 +245,9 @@ public CheckpointCoordinator(
 * Shuts down the checkpoint coordinator.
 *
 * After this method has been called, the coordinator does not accept
-* and further messages and cannot trigger any further checkpoints. All
-* checkpoint state is discarded.
-*/
-   public void shutdown() throws Exception {
-   shutdown(true);
-   }
-
-   /**
-* Suspends the checkpoint coordinator.
-*
-* After this method has been called, the coordinator does not accept
 * and further messages and cannot trigger any further checkpoints.
-*
-* The difference to shutdown is that checkpoint state in the store
-* and counter is kept around if possible to recover later.
-*/
-   public void suspend() throws Exception {
-   shutdown(false);
-   }
-
-   /**
-* Shuts down the checkpoint coordinator.
-*
-* @param shutdownStoreAndCounter Depending on this flag the checkpoint
-* state services are shut down or suspended.
 */
-   private void shutdown(boolean shutdownStoreAndCounter) throws Exception 
{
+   public void shutdown(JobStatus jobStatus) throws Exception {
--- End diff --

True, but I thought we kept it `shutdown` for consistency reasons.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568207#comment-15568207
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82969876
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -637,20 +637,25 @@ protected int savepoint(String[] args) {
"Specify a Job 
ID to trigger a savepoint."));
}
 
-   return triggerSavepoint(options, jobId);
+   String savepointDirectory = null;
+   if (cleanedArgs.length == 2) {
--- End diff --

Changed the check to `>= 2` and printed a message that some arguments are 
unneded.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568171#comment-15568171
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2608
  
Thank you very much for the thorough review. I will address the comments 
now and update the PR. 


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565575#comment-15565575
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2608
  
Tested the changes locally. I took a savepoint with and without explicit 
savepoint directory and resumed from there. Both seemed to work without a 
problem :-)


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565543#comment-15565543
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82797030
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
@@ -43,24 +125,62 @@
 * @return The loaded savepoint
 * @throws Exception Failures during load are forwared
 */
-   Savepoint loadSavepoint(String path) throws Exception;
+   public static Savepoint loadSavepoint(String path) throws IOException {
+   Preconditions.checkNotNull(path, "Path");
 
-   /**
-* Disposes the savepoint at the specified path.
-*
-* @param pathPath of savepoint to dispose
-* @throws Exception Failures during diposal are forwarded
-*/
-   void disposeSavepoint(String path) throws Exception;
+   try (DataInputStream dis = new 
DataInputViewStreamWrapper(createFsInputStream(new Path(path {
+   int magicNumber = dis.readInt();
+
+   if (magicNumber == MAGIC_NUMBER) {
+   int version = dis.readInt();
+
+   SavepointSerializer serializer = 
SavepointSerializers.getSerializer(version);
+   return serializer.deserialize(dis);
+   } else {
+   throw new RuntimeException("Unexpected magic 
number. This is most likely " +
+   "caused by trying to load a 
Flink 1.0 savepoint. You cannot load a " +
+   "savepoint triggered by Flink 
1.0 with this version of Flink. If it is " +
+   "_not_ a Flink 1.0 savepoint, 
this error indicates that the specified " +
+   "file is not a proper savepoint 
or the file has been corrupted.");
+   }
+   }
+   }
 
/**
-* Shut downs the savepoint store.
+* Removes the savepoint meta data w/o loading and disposing it.
 *
-* Only necessary for implementations where the savepoint life-cycle 
is
-* bound to the cluster life-cycle.
-*
-* @throws Exception Failures during shut down are forwarded
+* @param path Path of savepoint to remove
+* @throws Exception Failures during disposal are forwarded
 */
-   void shutdown() throws Exception;
+   public static void removeSavepoint(String path) throws Exception {
--- End diff --

Could be `IOException`


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565536#comment-15565536
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82781804
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 ---
@@ -51,9 +57,11 @@
/** States of the different task groups belonging to this checkpoint */
private final Map taskStates;
 
-   /** Flag to indicate whether the completed checkpoint data should be 
deleted when this
-* handle to the checkpoint is disposed */
-   private final boolean deleteStateWhenDisposed;
+   /** Properties for this checkpoint. */
+   private final CheckpointProperties props;
+
+   /** External path if persisted checkpoint; null otherwise. 
*/
+   private final String externalPath;
--- End diff --

Could the external path be part of the `CheckpointProperties`? The external 
path could be an `Option`. Then we would get around the null field.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565537#comment-15565537
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82785784
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -172,7 +168,7 @@ public void recover() throws Exception {
 
for (int i = 0; i < numberOfInitialCheckpoints - 1; 
i++) {
try {
-   
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+   
removeSubsumed(initialCheckpoints.get(i));
--- End diff --

Just a remark: In the future we might not want to remove all previous 
checkpoints in case that the youngest is not recoverable.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565528#comment-15565528
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82784661
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 ---
@@ -64,9 +62,9 @@ public void recover() throws Exception {
 
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
-   checkpoints.addLast(checkpoint);
+   checkpoints.add(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-   checkpoints.removeFirst().discardState();
+   checkpoints.remove().subsume();
--- End diff --

Maybe we shouldn't allow all different combinations of 
`CheckpointProperties`. Only those which make sense.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565540#comment-15565540
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82796670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
@@ -18,23 +18,105 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Savepoint store used to persist {@link Savepoint} instances.
+ * A file system based savepoint store.
  *
- * The main implementation is the {@link FsSavepointStore}. We also 
have the
- * {@link HeapSavepointStore} for historical reasons (introduced in Flink 
1.0).
+ * Stored savepoints have the following format:
+ * 
+ * MagicNumber SavepointVersion Savepoint
+ *   - MagicNumber => int
+ *   - SavepointVersion => int (returned by Savepoint#getVersion())
+ *   - Savepoint => bytes (serialized via version-specific 
SavepointSerializer)
+ * 
  */
-public interface SavepointStore {
+public class SavepointStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SavepointStore.class);
+
+   /** Magic number for sanity checks against stored savepoints. */
+   private static final int MAGIC_NUMBER = 0x4960672d;
+
+   /** Prefix for savepoint files. */
+   private static final String prefix = "savepoint-";
 
/**
 * Stores the savepoint.
 *
+* @param targetDirectory Target directory to store savepoint in
 * @param savepoint Savepoint to be stored
 * @paramSavepoint type
 * @return Path of stored savepoint
 * @throws Exception Failures during store are forwarded
 */
-String storeSavepoint(T savepoint) throws 
Exception;
+   public static  String storeSavepoint(
+   String targetDirectory,
+   T savepoint) throws IOException {
+
+   checkNotNull(targetDirectory, "Target directory");
+   checkNotNull(savepoint, "Savepoint");
+
+   Exception latestException = null;
+   Path path = null;
+   FSDataOutputStream fdos = null;
+
+   FileSystem fs = null;
+
+   // Try to create a FS output stream
+   for (int attempt = 0; attempt < 10; attempt++) {
+   path = new Path(targetDirectory, 
FileUtils.getRandomFilename(prefix));
+
+   if (fs == null) {
+   fs = FileSystem.get(path.toUri());
+   }
+
+   try {
+   fdos = fs.create(path, false);
+   break;
+   } catch (Exception e) {
+   latestException = e;
+   }
+   }
+
+   if (fdos == null) {
+   throw new IOException("Failed to create file output 
stream at " + path, latestException);
+   }
+
+   boolean success = false;
+   try (DataOutputStream dos = new DataOutputStream(fdos)) {
+   // Write header
+   dos.writeInt(MAGIC_NUMBER);
+   dos.writeInt(savepoint.getVersion());
+
+   // Write savepoint
+   SavepointSerializer serializer = 
SavepointSerializers.getSerializer(savepoint);
+   serializer.serialize(savepoint, dos);
+   success = true;
+   } finally {
+   if (!success && fs.exists(path)) {
+   if (!fs.delete(path, true)) {
+   LOG.warn("Failed to delete file " + 
path + " after failed write.");
--- End diff --

Placeholder {}


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  

[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565534#comment-15565534
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82787864
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 ---
@@ -48,13 +48,12 @@
public static CompletedCheckpoint loadAndValidateSavepoint(
JobID jobId,
Map tasks,
-   SavepointStore savepointStore,
String savepointPath) throws Exception {
 
// (1) load the savepoint
-   Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
+   Savepoint savepoint = 
SavepointStore.loadSavepoint(savepointPath);
final Map taskStates = new 
HashMap<>(savepoint.getTaskStates().size());
-   
+
// (2) validate it (parallelism, etc)
for (TaskState taskState : savepoint.getTaskStates()) {
ExecutionJobVertex executionJobVertex = 
tasks.get(taskState.getJobVertexID());
--- End diff --

Just a question for line 74. Shouldn't this be 
`taskState.getMaxParallelism()` and `executionJobVertex.getParallelism()`?


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565529#comment-15565529
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82776861
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -637,20 +637,25 @@ protected int savepoint(String[] args) {
"Specify a Job 
ID to trigger a savepoint."));
}
 
-   return triggerSavepoint(options, jobId);
+   String savepointDirectory = null;
+   if (cleanedArgs.length == 2) {
--- End diff --

Not sure, but what happens if the user types `bin/flink savepoint  
savepointDirectory foobar`. This will ignore the savepointDirectory, right? 
Maybe we could print a corresponding message if something like that happens.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565532#comment-15565532
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82783495
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ---
@@ -256,29 +306,51 @@ public boolean acknowledgeTask(
 * Aborts a checkpoint because it expired (took too long).
 */
public void abortExpired() throws Exception {
--- End diff --

Would it make sense to narrow the `Exception` a little bit down? Maybe 
introducing a `CheckpointException` or `PendingCheckpointException`?


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565535#comment-15565535
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82778557
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -282,29 +279,71 @@ public boolean isShutdown() {
//  Handling checkpoints and messages
// 

 
-   public Future triggerSavepoint(long timestamp) throws Exception 
{
-   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
+   /**
+* Triggers a savepoint with the default savepoint directory as a 
target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no default savepoint directory has 
been configured
+* @throws Exception Failures during triggering are forwarded
+*/
+   public Future triggerSavepoint(long timestamp) 
throws Exception {
+   return triggerSavepoint(timestamp, null);
+   }
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @param savepointDirectory Target directory for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+* @throws Exception Failures during triggering are 
forwarded
+*/
+   public Future triggerSavepoint(long timestamp, 
String savepointDirectory) throws Exception {
+   String targetDirectory;
+   if (savepointDirectory != null) {
+   targetDirectory = savepointDirectory;
+   } else if (this.savepointDirectory != null) {
+   targetDirectory = this.savepointDirectory;
+   } else {
+   throw new IllegalStateException("No savepoint directory 
configured. " +
+   "You can either specify a directory 
when triggering this savepoint or " +
+   "configure a cluster-wide default via 
key '" +
+   ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ "'.");
+   }
+
+   CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
 
if (result.isSuccess()) {
-   PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
-   return savepoint.getCompletionFuture();
-   }
-   else {
-   return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
+   return 
result.getPendingCheckpoint().getCompletionFuture();
+   } else {
+   CompletableFuture failed = new 
FlinkCompletableFuture<>();
+   failed.completeExceptionally(new Exception("Failed to 
trigger savepoint: " + result.getFailureReason().message()));
--- End diff --

Not adding the complete stack trace is on purpose, right? I'm wondering 
whether this could not help to debug problems later.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up 

[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565526#comment-15565526
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82781688
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -18,44 +18,243 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.io.Serializable;
+
 /**
  * The configuration of a checkpoint, such as whether
  * 
- * The checkpoint is a savepoint
+ * The checkpoint should be persisted
  * The checkpoint must be full, or may be incremental
  * The checkpoint format must be the common (cross backend) 
format, or may be state-backend specific
--- End diff --

The second and third are not yet implemented, aren't they?


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565542#comment-15565542
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82799818
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 ---
@@ -467,8 +467,11 @@ object JobManagerMessages {
 * of triggering and acknowledging checkpoints.
 *
 * @param jobId The JobID of the job to trigger the savepoint for.
+* @param savepointDirectory Optional target directory
 */
-  case class TriggerSavepoint(jobId: JobID) extends RequiresLeaderSessionID
+  case class TriggerSavepoint(
+  jobId: JobID,
+  savepointDirectory : String = null) extends RequiresLeaderSessionID
--- End diff --

Scalaesque would be to use `Option`.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565527#comment-15565527
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82777602
  
--- Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee ---
@@ -29,8 +29,8 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
 # --
 
 .value 'flinkConfig', {
-  jobServer: ''
-# jobServer: 'http://localhost:8081/'
+#  jobServer: ''
+  jobServer: 'http://localhost:8081/'
--- End diff --

What's the reason for this change?


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565541#comment-15565541
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82796870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
@@ -43,24 +125,62 @@
 * @return The loaded savepoint
 * @throws Exception Failures during load are forwared
 */
-   Savepoint loadSavepoint(String path) throws Exception;
+   public static Savepoint loadSavepoint(String path) throws IOException {
+   Preconditions.checkNotNull(path, "Path");
 
-   /**
-* Disposes the savepoint at the specified path.
-*
-* @param pathPath of savepoint to dispose
-* @throws Exception Failures during diposal are forwarded
-*/
-   void disposeSavepoint(String path) throws Exception;
+   try (DataInputStream dis = new 
DataInputViewStreamWrapper(createFsInputStream(new Path(path {
+   int magicNumber = dis.readInt();
+
+   if (magicNumber == MAGIC_NUMBER) {
+   int version = dis.readInt();
+
+   SavepointSerializer serializer = 
SavepointSerializers.getSerializer(version);
+   return serializer.deserialize(dis);
+   } else {
+   throw new RuntimeException("Unexpected magic 
number. This is most likely " +
+   "caused by trying to load a 
Flink 1.0 savepoint. You cannot load a " +
+   "savepoint triggered by Flink 
1.0 with this version of Flink. If it is " +
+   "_not_ a Flink 1.0 savepoint, 
this error indicates that the specified " +
+   "file is not a proper savepoint 
or the file has been corrupted.");
+   }
+   }
+   }
 
/**
-* Shut downs the savepoint store.
+* Removes the savepoint meta data w/o loading and disposing it.
 *
-* Only necessary for implementations where the savepoint life-cycle 
is
-* bound to the cluster life-cycle.
-*
-* @throws Exception Failures during shut down are forwarded
+* @param path Path of savepoint to remove
+* @throws Exception Failures during disposal are forwarded
 */
-   void shutdown() throws Exception;
+   public static void removeSavepoint(String path) throws Exception {
+   Preconditions.checkNotNull(path, "Path");
+
+   try {
+   LOG.info("Removing savepoint: " + path);
--- End diff --

{}


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565533#comment-15565533
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82786136
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -233,68 +229,90 @@ public int getNumberOfRetainedCheckpoints() {
}
 
@Override
-   public void shutdown() throws Exception {
-   LOG.info("Shutting down");
+   public void shutdown(JobStatus jobStatus) throws Exception {
--- End diff --

Maybe we can narrow the `Exception` a little bit down here or wrap the 
`InterruptedException` and `KeeperException` in our own exception.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565531#comment-15565531
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82778433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -282,29 +279,71 @@ public boolean isShutdown() {
//  Handling checkpoints and messages
// 

 
-   public Future triggerSavepoint(long timestamp) throws Exception 
{
-   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
+   /**
+* Triggers a savepoint with the default savepoint directory as a 
target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no default savepoint directory has 
been configured
+* @throws Exception Failures during triggering are forwarded
+*/
+   public Future triggerSavepoint(long timestamp) 
throws Exception {
+   return triggerSavepoint(timestamp, null);
+   }
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @param savepointDirectory Target directory for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+* @throws Exception Failures during triggering are 
forwarded
+*/
+   public Future triggerSavepoint(long timestamp, 
String savepointDirectory) throws Exception {
+   String targetDirectory;
+   if (savepointDirectory != null) {
+   targetDirectory = savepointDirectory;
+   } else if (this.savepointDirectory != null) {
+   targetDirectory = this.savepointDirectory;
+   } else {
+   throw new IllegalStateException("No savepoint directory 
configured. " +
+   "You can either specify a directory 
when triggering this savepoint or " +
+   "configure a cluster-wide default via 
key '" +
+   ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ "'.");
+   }
+
+   CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
 
if (result.isSuccess()) {
-   PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
-   return savepoint.getCompletionFuture();
-   }
-   else {
-   return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
+   return 
result.getPendingCheckpoint().getCompletionFuture();
+   } else {
+   CompletableFuture failed = new 
FlinkCompletableFuture<>();
--- End diff --

In order to create a completed future you can write 
`FlinkCompletableFuture.completedExceptionally(Throwable t)`.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--

[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565539#comment-15565539
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82782971
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 ---
@@ -51,9 +57,11 @@
/** States of the different task groups belonging to this checkpoint */
private final Map taskStates;
 
-   /** Flag to indicate whether the completed checkpoint data should be 
deleted when this
-* handle to the checkpoint is disposed */
-   private final boolean deleteStateWhenDisposed;
+   /** Properties for this checkpoint. */
+   private final CheckpointProperties props;
+
+   /** External path if persisted checkpoint; null otherwise. 
*/
+   private final String externalPath;
--- End diff --

Ok I see, the path is not fully known when creating the 
`CheckpointProperties`...


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565538#comment-15565538
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82784524
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 ---
@@ -64,9 +62,9 @@ public void recover() throws Exception {
 
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
-   checkpoints.addLast(checkpoint);
+   checkpoints.add(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-   checkpoints.removeFirst().discardState();
+   checkpoints.remove().subsume();
--- End diff --

What happens if the `CheckpointProperties.discardOnSubsumed() == false`. 
This could lead to files in your nfs which are not cleaned up even though they 
are not useful for anything else (given that it's not a persistent checkpoint), 
right?


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15565530#comment-15565530
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82778103
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -219,33 +245,9 @@ public CheckpointCoordinator(
 * Shuts down the checkpoint coordinator.
 *
 * After this method has been called, the coordinator does not accept
-* and further messages and cannot trigger any further checkpoints. All
-* checkpoint state is discarded.
-*/
-   public void shutdown() throws Exception {
-   shutdown(true);
-   }
-
-   /**
-* Suspends the checkpoint coordinator.
-*
-* After this method has been called, the coordinator does not accept
 * and further messages and cannot trigger any further checkpoints.
-*
-* The difference to shutdown is that checkpoint state in the store
-* and counter is kept around if possible to recover later.
-*/
-   public void suspend() throws Exception {
-   shutdown(false);
-   }
-
-   /**
-* Shuts down the checkpoint coordinator.
-*
-* @param shutdownStoreAndCounter Depending on this flag the checkpoint
-* state services are shut down or suspended.
 */
-   private void shutdown(boolean shutdownStoreAndCounter) throws Exception 
{
+   public void shutdown(JobStatus jobStatus) throws Exception {
--- End diff --

Didn't you tell me that the verb is `shutDown`?


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1450#comment-1450
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2608

[FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints

## Introduction

This is the first part of 
[FLIP-10](https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints),
 allowing users to persist periodic checkpoints.

Persistent checkpoints behave very much like regular periodic checkpoints 
except the following differences:

1. They persist their meta data (like savepoints).
2. They are not discarded when the owning job fails permanently. 
Furthermore, they can be configured to not be discarded when the job is 
cancelled.

This means that if a job fails permanently the user will have a checkpoint 
available to restore from. As an example think of the following scenario: a job 
runs smoothly until it hits a bad record that it cannot handle. The current 
behaviour will be that the job will try to recover, but it will hit the bad 
record again and keep on failing. With persistent checkpoints, the user can 
update the program to handle bad records and restore from the most recent 
persistent checkpoints.

## CheckpointConfig

This adds the following `@PublicEvolving` methods to `CheckpointConfig`:

```
enablePersistentCheckpoints(String targetDirectory);
enablePersistentCheckpoints(String targetDirectory, 
PersistentCheckpointCleanup cleanup)
```

The `PersistentCheckpointCleanup` defines how persistent checkpoints are 
cleaned up when the owning job is cancelled. Since currently most streaming 
jobs are stopped via cancellation, the default is to clean persistent 
checkpoints up. The user can overwrite this behaviour via the enum.

## REST API

The REST API exposes the external-path of the most recent persistent 
checkpoint via the REST API. This is also displayed in the web UI for the most 
recent persistent checkpoint.

![screen shot 2016-10-07 at 17 50 
44](https://cloud.githubusercontent.com/assets/1756620/19196699/d0d5065a-8cb6-11e6-8b13-c6bacc4ebe19.png)

## Deprecate savepoint state backends (FLINK-4507)

Furthermore, the savepoint state backends have been removed and all 
savepoints now go to files. The corresponding configuration keys have been 
removed or deprecated:

`savepoints.state.backend.fs.dir` has been deprecated in favour of 
`state.savepoints.dir`. `savepoints.state.backend` has been removed.

## Allow to specify custom savepoint directory (FLINK-4509)

The target directory for savepoints was configured per Flink configuration. 
With this change, this can be overwritten:

```
bin/flink savepoint  [targetDirectory]
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 4512-persistent_checkpoints

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2608.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2608


commit 004ba0b38ac2b75148910660242808c13746c444
Author: Ufuk Celebi 
Date:   2016-10-06T14:43:42Z

[FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints

[FLINK-4509] [FLIP-10] Specify savepoint directory per savepoint
[FLINK-4507] [FLIP-10] Deprecate savepoint backend config




> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian 

[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-08-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439315#comment-15439315
 ] 

Stephan Ewen commented on FLINK-4512:
-

+1 that makes a lot of sense.

One thing to watch out for is that **stopping** a Job lets it currently end in 
state FINISHED. Is it desired to remove externalized checkpoints in that case?

It may make sense to change the behavior of "stop()" anyways (have STOPPING and 
STOPPED), but for now, I guess that this may cause confusion. Stopping is 
frequently used as a "soft cancelling".

> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)