[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468521#comment-16468521
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5427


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465742#comment-16465742
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5427
  
Thanks a lot, looks good, can merge this now.

One quick question: You decided to have empty default implementations for 
the new methods in the master hook interface. Given that Pravega is currently 
the only known user of that interface, I would be okay with breaking the 
interface (no default methods) if you think that would be cleaner.


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464530#comment-16464530
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5427
  
@StephanEwen thanks again for the feedback, which I took to heart and 
simplified the hook.  It now has a `reset ` method that is called only in the 
special case.

I will refactor the thread context code in a separate PR.



> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454653#comment-16454653
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5427#discussion_r184482881
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1358,6 +1358,10 @@ class JobManager(
   throw new SuppressRestartsException(e)
   }
 }
+else {
--- End diff --

If we did not have the unconditional initialization logic, we should also 
be able to drop this part. That would also make this work with the flip-6 code.


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454651#comment-16454651
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5427#discussion_r184477785
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1009,6 +1013,11 @@ public boolean restoreLatestCheckpointedState(
 
LOG.debug("Status of the shared state registry after 
restore: {}.", sharedStateRegistry);
 
+   // Instruct the master hooks to initialize their state 
(unconditionally)
+   LOG.debug("Initializing the master hooks.");
--- End diff --

Can you elaborate a bit why this initialization is happening in all cases?
An alternative would be to have a `reset()` method or so on the master hook 
that is called further below, in the `if (latest == null)` code block.

Initializing the state seems a tad bit unituitive to me here - I somehow 
assume that an init function is called once, while this one here is called on 
every restore.


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454654#comment-16454654
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5427#discussion_r184478068
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
 ---
@@ -138,4 +158,11 @@
 */
 MasterTriggerRestoreHook create();
}
+
+   /**
+* The hook initialization context.
+*/
+   interface HookInitializationContext {
--- End diff --

I assume this is for ease of future evolvability?


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454652#comment-16454652
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5427#discussion_r184481704
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
 ---
@@ -291,6 +341,34 @@ else if (!allowUnmatchedState) {
this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader);
}
 
+   @Override
+   public void initializeState(HookInitializationContext context) 
throws Exception {
+   final Thread thread = Thread.currentThread();
--- End diff --

We could use a utility like:
```java
public static void withContextClassLoader(ClassLoader cl, Runnable r) {
final Thread thread = Thread.currentThread();
final ClassLoader originalClassLoader = thread.getContextClassLoader();

try {
thread.setContextClassLoader(userClassLoader);
r.run();
}
finally {
thread.setContextClassLoader(originalClassLoader);
}
}

withContextClassLoader(userClassLoader, () -> { 
hook.initializeState(context); });
```

How about adding that to `org.apache.flink.util.LambdaUtil`?


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419889#comment-16419889
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5427
  
@tillrohrmann @StephanEwen sorry about the long delay here, would you 
please take another look?

I followed Stephan's suggestion of not introducing a new method.   However, 
the semantics that I was shooting for with `initializeState` is that it would 
be called on both _start_ and _restart_.  I adjusted `JobManager` to call 
`restoreLatestCheckpointedState` on first execution (as does `JobMaster`).  Are 
you OK with that?


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365234#comment-16365234
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5427
  
I think we are talking about two different things here:

  1. We DO need a new method on the hook interface to reinitialize the 
reader group. I think the one suggested by Eron works.
  2. We DO NOT need a new method on the CheckpointCoordinator, but calling 
`Hook.initializeState()` within `CC.restoreLatestCheckpointedState()` (whenever 
there is no checkpoint to be restored) should work.

==> Let's add the Hook method, but not add an additional method to the 
CheckpointCoordinator.


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364336#comment-16364336
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5427
  
I feel that we're not addressing the core issue that we're trying to fix.   
1. A new job starts up with checkpointing enabled and a hook-based source.
2. The source begins to consume events, causing some external state to 
become mutated.
3. _Before the first checkpoint_, a task throws an exception, causing a 
global restart.
4. Since the hook has no opportunity to rewind the external state to 
initial conditions, data loss occurs.

The above is a special case.  In the normal case, one or more checkpoints 
have occurred before the restart occurs, and so the hook's `restore` method is 
effective.



> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363764#comment-16363764
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5427
  
@EronWright, you're right that on initial submission we don't call 
`restoreLatestCheckpointedState` in the old code. With Flip-6 this will be the 
case. See #5444.

The underlying assumption to make this work, though, is that a user won't 
submit a new job with the a job id to a cluster with a cluster id for which 
ZooKeeper already contains persisted checkpoints from a previous run. So either 
the cluster id or the job id must be different. 

I think so far, when using the Flink client this should be the case. 
However, when generating the `JobGraph` yourself and keeping it around to 
submit it to a standalone cluster, then this assumption will break because both 
the `JobID` and the cluster id will be the same.


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362722#comment-16362722
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5427
  
@StephanEwen do you mean that we could avoid adding an initialization 
method to the hook interface?  We need to somehow call into the hook to reset 
its state, even when there's no checkpoint data to work with.   Could you 
rephrase your suggestion?  Thanks.


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362662#comment-16362662
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5427
  
I think we may get around adding a new method. I checked with @tillrohrmann 
, here are the thoughts:

  - Submitting a job initially as a new reader group, no need to reset here
  - Recovering at any point calls the `restoreLatestCheckpointedState()` 
method
  - Also recovering from a JobManager failover basically "resubmits" the 
job with a special flag causing the JM to call the 
`restoreLatestCheckpointedState()` for the job.


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361295#comment-16361295
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5427
  
@StephanEwen thanks for taking a look, I agree with trying to avoid a new 
lifecycle method.

The `initializeState` method on the hook interface gives the hook an 
unconditional initialization point. 
In the Pravega case, we would move reader-group (RG) initialization from 
client to server, and always reset the RG to its initial conditions.   A 
subsequent restore may or may not occur.

Assuming we like this approach, let's discuss how to make it work purely 
with `restoreLatestCheckpointedState`.   The `restoreLatestCheckpointedState` 
method is not called by the ExecutionGraph (EG) upon initial execution, which 
we would want to support the new `initializeState` method.   Would there be any 
issue with calling `restoreLatestCheckpointedState` on initial execution? Such 
symmetry would seem desirable.  

**Existing approach**:
```
=== initial ===
\-- JM.submitJob
|   \-- EG.scheduleForExecution

=== restart===
\-- RestartCallback.triggerFullRecovery
|   \-- EG.restart
|   |   \-- CC.restoreLatestCheckpointedState
|   |   \-- EG.scheduleForExecution
```

**Suggested approach**:
```
=== initial ===
\-- JM.submitJob
|   \-- EG.start  (** new method **)
|   |   \-- CC.restoreLatestCheckpointedState
|   |   |   \-- Hook.initializeState
|   |   \-- EG.scheduleForExecution

=== restart===
\-- RestartCallback.triggerFullRecovery
|   \-- EG.restart
|   |   \-- CC.restoreLatestCheckpointedState
|   |   |   \-- Hook.initializeState
|   |   \-- EG.scheduleForExecution
```


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361128#comment-16361128
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5427
  
Given the already non-trivial complexity of the `CheckpointCoordinator`, I 
am wondering if there is a way to do this without adding the 
`resetForNewExecution()` method.

Adding another life cycle method (even if it is not exploited for other 
purposes currently) would need more involved tests and make future maintenance 
harder.

Can we reset all hooks in the `restoreLatestCheckpointedState()` method? 
Would we miss any cases if we do only that? 


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360404#comment-16360404
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8533:


[~eronwright]thanks for the heads up on this ticket.

Regarding your comment on the Kafka connector's `setStartFromGroupOffsets`:
The problem should, AFAIK, not occur there because we differentiate between 
having some restored state / no restored state, when determining whether or not 
to respect the `StartupMode`.

But yes, for the Pravega connector, I think this is valid. Will take a look at 
the pull request soon.

> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356450#comment-16356450
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5427
  
@tzulitai @StephanEwen please take a look, thanks.


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356449#comment-16356449
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

GitHub user EronWright opened a pull request:

https://github.com/apache/flink/pull/5427

[FLINK-8533] [checkpointing] Support MasterTriggerRestoreHook state 
reinitialization


Signed-off-by: Eron Wright 

## What is the purpose of the change
Support MasterTriggerRestoreHook state re-initialization, to eliminate an 
edge case involving execution restarts where no checkpoint state is available.

## Brief change log
- extend `MasterTriggerRestoreHook` with `initializeState` method.
- invoke `initializeState` upon initial execution and upon global restart.

## Verifying this change
- Revised test: `CheckpointCoordinatorMasterHooksTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency):  no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  **yes**
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes**
  - The S3 file system connector:  no

## Documentation
  - Does this pull request introduce a new feature?  no



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EronWright/flink 
FLINK-8533-hook-initialization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5427.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5427


commit 9ad0e03c1aa81012ae14f598acdcf3eb76c9ec9f
Author: Eron Wright 
Date:   2018-02-08T03:38:47Z

[FLINK-8533] Support MasterTriggerRestoreHook state reinitialization

- extend `MasterTriggerRestoreHook` with `initializeState` method.
- invoke `initializeState` upon initial execution and upon global
restart.

Signed-off-by: Eron Wright 




> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-01-31 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348017#comment-16348017
 ] 

Eron Wright  commented on FLINK-8533:
-

Incidentally, a variation on this problem would (I believe) occur when using 
the Kafka consumer and {{setStartFromGroupOffsets}} is enabled.   That's 
because the Kafka connector uses external storage to initialize its state (i.e. 
the starting position) in the non-restore case.

> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-01-31 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346400#comment-16346400
 ] 

Eron Wright  commented on FLINK-8533:
-

This relates to [issue 
#89|https://github.com/pravega/flink-connectors/issues/89] in flink-connectors. 
cc [~tzulitai] 

> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)