[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468521#comment-16468521 ] ASF GitHub Bot commented on FLINK-8533: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5427 > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465742#comment-16465742 ] ASF GitHub Bot commented on FLINK-8533: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5427 Thanks a lot, looks good, can merge this now. One quick question: You decided to have empty default implementations for the new methods in the master hook interface. Given that Pravega is currently the only known user of that interface, I would be okay with breaking the interface (no default methods) if you think that would be cleaner. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464530#comment-16464530 ] ASF GitHub Bot commented on FLINK-8533: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5427 @StephanEwen thanks again for the feedback, which I took to heart and simplified the hook. It now has a `reset ` method that is called only in the special case. I will refactor the thread context code in a separate PR. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454653#comment-16454653 ] ASF GitHub Bot commented on FLINK-8533: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5427#discussion_r184482881 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1358,6 +1358,10 @@ class JobManager( throw new SuppressRestartsException(e) } } +else { --- End diff -- If we did not have the unconditional initialization logic, we should also be able to drop this part. That would also make this work with the flip-6 code. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454651#comment-16454651 ] ASF GitHub Bot commented on FLINK-8533: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5427#discussion_r184477785 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1009,6 +1013,11 @@ public boolean restoreLatestCheckpointedState( LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry); + // Instruct the master hooks to initialize their state (unconditionally) + LOG.debug("Initializing the master hooks."); --- End diff -- Can you elaborate a bit why this initialization is happening in all cases? An alternative would be to have a `reset()` method or so on the master hook that is called further below, in the `if (latest == null)` code block. Initializing the state seems a tad bit unituitive to me here - I somehow assume that an init function is called once, while this one here is called on every restore. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454654#comment-16454654 ] ASF GitHub Bot commented on FLINK-8533: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5427#discussion_r184478068 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java --- @@ -138,4 +158,11 @@ */ MasterTriggerRestoreHook create(); } + + /** +* The hook initialization context. +*/ + interface HookInitializationContext { --- End diff -- I assume this is for ease of future evolvability? > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454652#comment-16454652 ] ASF GitHub Bot commented on FLINK-8533: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5427#discussion_r184481704 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java --- @@ -291,6 +341,34 @@ else if (!allowUnmatchedState) { this.userClassLoader = Preconditions.checkNotNull(userClassLoader); } + @Override + public void initializeState(HookInitializationContext context) throws Exception { + final Thread thread = Thread.currentThread(); --- End diff -- We could use a utility like: ```java public static void withContextClassLoader(ClassLoader cl, Runnable r) { final Thread thread = Thread.currentThread(); final ClassLoader originalClassLoader = thread.getContextClassLoader(); try { thread.setContextClassLoader(userClassLoader); r.run(); } finally { thread.setContextClassLoader(originalClassLoader); } } withContextClassLoader(userClassLoader, () -> { hook.initializeState(context); }); ``` How about adding that to `org.apache.flink.util.LambdaUtil`? > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419889#comment-16419889 ] ASF GitHub Bot commented on FLINK-8533: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5427 @tillrohrmann @StephanEwen sorry about the long delay here, would you please take another look? I followed Stephan's suggestion of not introducing a new method. However, the semantics that I was shooting for with `initializeState` is that it would be called on both _start_ and _restart_. I adjusted `JobManager` to call `restoreLatestCheckpointedState` on first execution (as does `JobMaster`). Are you OK with that? > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365234#comment-16365234 ] ASF GitHub Bot commented on FLINK-8533: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5427 I think we are talking about two different things here: 1. We DO need a new method on the hook interface to reinitialize the reader group. I think the one suggested by Eron works. 2. We DO NOT need a new method on the CheckpointCoordinator, but calling `Hook.initializeState()` within `CC.restoreLatestCheckpointedState()` (whenever there is no checkpoint to be restored) should work. ==> Let's add the Hook method, but not add an additional method to the CheckpointCoordinator. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364336#comment-16364336 ] ASF GitHub Bot commented on FLINK-8533: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5427 I feel that we're not addressing the core issue that we're trying to fix. 1. A new job starts up with checkpointing enabled and a hook-based source. 2. The source begins to consume events, causing some external state to become mutated. 3. _Before the first checkpoint_, a task throws an exception, causing a global restart. 4. Since the hook has no opportunity to rewind the external state to initial conditions, data loss occurs. The above is a special case. In the normal case, one or more checkpoints have occurred before the restart occurs, and so the hook's `restore` method is effective. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363764#comment-16363764 ] ASF GitHub Bot commented on FLINK-8533: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5427 @EronWright, you're right that on initial submission we don't call `restoreLatestCheckpointedState` in the old code. With Flip-6 this will be the case. See #5444. The underlying assumption to make this work, though, is that a user won't submit a new job with the a job id to a cluster with a cluster id for which ZooKeeper already contains persisted checkpoints from a previous run. So either the cluster id or the job id must be different. I think so far, when using the Flink client this should be the case. However, when generating the `JobGraph` yourself and keeping it around to submit it to a standalone cluster, then this assumption will break because both the `JobID` and the cluster id will be the same. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362722#comment-16362722 ] ASF GitHub Bot commented on FLINK-8533: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5427 @StephanEwen do you mean that we could avoid adding an initialization method to the hook interface? We need to somehow call into the hook to reset its state, even when there's no checkpoint data to work with. Could you rephrase your suggestion? Thanks. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362662#comment-16362662 ] ASF GitHub Bot commented on FLINK-8533: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5427 I think we may get around adding a new method. I checked with @tillrohrmann , here are the thoughts: - Submitting a job initially as a new reader group, no need to reset here - Recovering at any point calls the `restoreLatestCheckpointedState()` method - Also recovering from a JobManager failover basically "resubmits" the job with a special flag causing the JM to call the `restoreLatestCheckpointedState()` for the job. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361295#comment-16361295 ] ASF GitHub Bot commented on FLINK-8533: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5427 @StephanEwen thanks for taking a look, I agree with trying to avoid a new lifecycle method. The `initializeState` method on the hook interface gives the hook an unconditional initialization point. In the Pravega case, we would move reader-group (RG) initialization from client to server, and always reset the RG to its initial conditions. A subsequent restore may or may not occur. Assuming we like this approach, let's discuss how to make it work purely with `restoreLatestCheckpointedState`. The `restoreLatestCheckpointedState` method is not called by the ExecutionGraph (EG) upon initial execution, which we would want to support the new `initializeState` method. Would there be any issue with calling `restoreLatestCheckpointedState` on initial execution? Such symmetry would seem desirable. **Existing approach**: ``` === initial === \-- JM.submitJob | \-- EG.scheduleForExecution === restart=== \-- RestartCallback.triggerFullRecovery | \-- EG.restart | | \-- CC.restoreLatestCheckpointedState | | \-- EG.scheduleForExecution ``` **Suggested approach**: ``` === initial === \-- JM.submitJob | \-- EG.start (** new method **) | | \-- CC.restoreLatestCheckpointedState | | | \-- Hook.initializeState | | \-- EG.scheduleForExecution === restart=== \-- RestartCallback.triggerFullRecovery | \-- EG.restart | | \-- CC.restoreLatestCheckpointedState | | | \-- Hook.initializeState | | \-- EG.scheduleForExecution ``` > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361128#comment-16361128 ] ASF GitHub Bot commented on FLINK-8533: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5427 Given the already non-trivial complexity of the `CheckpointCoordinator`, I am wondering if there is a way to do this without adding the `resetForNewExecution()` method. Adding another life cycle method (even if it is not exploited for other purposes currently) would need more involved tests and make future maintenance harder. Can we reset all hooks in the `restoreLatestCheckpointedState()` method? Would we miss any cases if we do only that? > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360404#comment-16360404 ] Tzu-Li (Gordon) Tai commented on FLINK-8533: [~eronwright]thanks for the heads up on this ticket. Regarding your comment on the Kafka connector's `setStartFromGroupOffsets`: The problem should, AFAIK, not occur there because we differentiate between having some restored state / no restored state, when determining whether or not to respect the `StartupMode`. But yes, for the Pravega connector, I think this is valid. Will take a look at the pull request soon. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356450#comment-16356450 ] ASF GitHub Bot commented on FLINK-8533: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5427 @tzulitai @StephanEwen please take a look, thanks. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356449#comment-16356449 ] ASF GitHub Bot commented on FLINK-8533: --- GitHub user EronWright opened a pull request: https://github.com/apache/flink/pull/5427 [FLINK-8533] [checkpointing] Support MasterTriggerRestoreHook state reinitialization Signed-off-by: Eron Wright## What is the purpose of the change Support MasterTriggerRestoreHook state re-initialization, to eliminate an edge case involving execution restarts where no checkpoint state is available. ## Brief change log - extend `MasterTriggerRestoreHook` with `initializeState` method. - invoke `initializeState` upon initial execution and upon global restart. ## Verifying this change - Revised test: `CheckpointCoordinatorMasterHooksTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **yes** - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes** - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/EronWright/flink FLINK-8533-hook-initialization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5427.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5427 commit 9ad0e03c1aa81012ae14f598acdcf3eb76c9ec9f Author: Eron Wright Date: 2018-02-08T03:38:47Z [FLINK-8533] Support MasterTriggerRestoreHook state reinitialization - extend `MasterTriggerRestoreHook` with `initializeState` method. - invoke `initializeState` upon initial execution and upon global restart. Signed-off-by: Eron Wright > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348017#comment-16348017 ] Eron Wright commented on FLINK-8533: - Incidentally, a variation on this problem would (I believe) occur when using the Kafka consumer and {{setStartFromGroupOffsets}} is enabled. That's because the Kafka connector uses external storage to initialize its state (i.e. the starting position) in the non-restore case. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346400#comment-16346400 ] Eron Wright commented on FLINK-8533: - This relates to [issue #89|https://github.com/pravega/flink-connectors/issues/89] in flink-connectors. cc [~tzulitai] > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)