Re: Self-checkpoint Support on Portable Flink
t; > > to me. Probably we want to start with approach #1 > for > > > all Runners to be > > > able to support delaying bundles. Flink supports > > cycles > > > and thus > > > approach #2 would also be applicable and could be > > used > > > to implement > > > dynamic splitting. > > > > > > -Max > > > > > > On 05.10.20 23:13, Luke Cwik wrote: > > > > Thanks Boyuan, I left a few comments. > > > > > > > > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang > > > mailto:boyu...@google.com> > > <mailto:boyu...@google.com <mailto:boyu...@google.com>> > > > > <mailto:boyu...@google.com > > <mailto:boyu...@google.com> > > > <mailto:boyu...@google.com > > <mailto:boyu...@google.com>>>> wrote: > > > > > > > > Hi team, > > > > > > > > I'm looking at adding self-checkpoint > > support to > > > portable Flink > > > > runner(BEAM-10940 > > > > > > <https://issues.apache.org/jira/browse/BEAM-10940 > > <https://issues.apache.org/jira/browse/BEAM-10940> > > > <https://issues.apache.org/jira/browse/BEAM-10940 > > <https://issues.apache.org/jira/browse/BEAM-10940>>>) for > > > both batch > > > > and streaming. I summarized the problem > > that we > > > want to solve and > > > > proposed 2 potential approaches in this doc > > > > > > > > > < > https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing > < > https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing> > < > https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing > < > https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing > >>>. > > > > > > > > I want to collect feedback on which > > approach is > > > preferred and > > > > anything that I have not taken into > > consideration > > > yet but I should. > > > > Many thanks to all your help! > > > > > > > > Boyuan > > > > > > > > > >
Re: Self-checkpoint Support on Portable Flink
Duplicates cannot happen because the state of all operators will be rolled back to the latest checkpoint, in case of failures. On 14.10.20 06:31, Reuven Lax wrote: Does this mean that we have to deal with duplicate messages over the back edge? Or will that not happen, since duplicates mean that we rolled back a checkpoint. On Tue, Oct 13, 2020 at 2:59 AM Maximilian Michels <mailto:m...@apache.org>> wrote: There would be ways around the lack of checkpointing in cycles, e.g. buffer and backloop only after checkpointing is complete, similarly how we implement @RequiresStableInput in the Flink Runner. -Max On 07.10.20 04:05, Reuven Lax wrote: > It appears that there's a proposal > (https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance <https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance> > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance <https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance>>) > and an abandoned PR to fix this, but AFAICT this remains a limitation of > Flink. If Flink can't guarantee processing of records on back edges, I > don't think we can use cycles, as we might otherwise lose the residuals. > > On Tue, Oct 6, 2020 at 6:16 PM Reuven Lax mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>>> wrote: > > This is what I was thinking of > > "Flink currently only provides processing guarantees for jobs > without iterations. Enabling checkpointing on an iterative job > causes an exception. In order to force checkpointing on an iterative > program the user needs to set a special flag when enabling > checkpointing:|env.enableCheckpointing(interval, > CheckpointingMode.EXACTLY_ONCE, force = true)|. > > Please note that records in flight in the loop edges (and the state > changes associated with them) will be lost during failure." > > > > > > > On Tue, Oct 6, 2020 at 5:44 PM Boyuan Zhang mailto:boyu...@google.com> > <mailto:boyu...@google.com <mailto:boyu...@google.com>>> wrote: > > Hi Reuven, > > As Luke mentioned, at least there are some limitations around > tracking watermark with flink cycles. I'm going to use State + > Timer without flink cycle to support self-checkpoint. For > dynamic split, we can either explore flink cycle approach or > limit depth approach. > > On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>>> wrote: > > Aren't there some limitations associated with flink cycles? > I seem to remember various features that could not be used. > I'm assuming that watermarks are not supported across > cycles, but is there anything else? > > On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels > mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > > Thanks for starting the conversation. The two approaches > both look good > to me. Probably we want to start with approach #1 for > all Runners to be > able to support delaying bundles. Flink supports cycles > and thus > approach #2 would also be applicable and could be used > to implement > dynamic splitting. > > -Max > > On 05.10.20 23:13, Luke Cwik wrote: > > Thanks Boyuan, I left a few comments. > > > > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang > mailto:boyu...@google.com> <mailto:boyu...@google.com <mailto:boyu...@google.com>> > > <mailto:boyu...@google.com <mailto:boyu...@google.com> > <mailto:boyu...@google.com <mailto:boyu...@google.com>>>> wrote: > > > > Hi team, > > > > I'm looking at
Re: Self-checkpoint Support on Portable Flink
Does this mean that we have to deal with duplicate messages over the back edge? Or will that not happen, since duplicates mean that we rolled back a checkpoint. On Tue, Oct 13, 2020 at 2:59 AM Maximilian Michels wrote: > There would be ways around the lack of checkpointing in cycles, e.g. > buffer and backloop only after checkpointing is complete, similarly how > we implement @RequiresStableInput in the Flink Runner. > > -Max > > On 07.10.20 04:05, Reuven Lax wrote: > > It appears that there's a proposal > > ( > https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance > > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance>) > > > and an abandoned PR to fix this, but AFAICT this remains a limitation of > > Flink. If Flink can't guarantee processing of records on back edges, I > > don't think we can use cycles, as we might otherwise lose the residuals. > > > > On Tue, Oct 6, 2020 at 6:16 PM Reuven Lax > <mailto:re...@google.com>> wrote: > > > > This is what I was thinking of > > > > "Flink currently only provides processing guarantees for jobs > > without iterations. Enabling checkpointing on an iterative job > > causes an exception. In order to force checkpointing on an iterative > > program the user needs to set a special flag when enabling > > checkpointing:|env.enableCheckpointing(interval, > > CheckpointingMode.EXACTLY_ONCE, force = true)|. > > > > Please note that records in flight in the loop edges (and the state > > changes associated with them) will be lost during failure." > > > > > > > > > > > > > > On Tue, Oct 6, 2020 at 5:44 PM Boyuan Zhang > <mailto:boyu...@google.com>> wrote: > > > > Hi Reuven, > > > > As Luke mentioned, at least there are some limitations around > > tracking watermark with flink cycles. I'm going to use State + > > Timer without flink cycle to support self-checkpoint. For > > dynamic split, we can either explore flink cycle approach or > > limit depth approach. > > > > On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax > <mailto:re...@google.com>> wrote: > > > > Aren't there some limitations associated with flink cycles? > > I seem to remember various features that could not be used. > > I'm assuming that watermarks are not supported across > > cycles, but is there anything else? > > > > On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels > > mailto:m...@apache.org>> wrote: > > > > Thanks for starting the conversation. The two approaches > > both look good > > to me. Probably we want to start with approach #1 for > > all Runners to be > > able to support delaying bundles. Flink supports cycles > > and thus > > approach #2 would also be applicable and could be used > > to implement > > dynamic splitting. > > > > -Max > > > > On 05.10.20 23:13, Luke Cwik wrote: > > > Thanks Boyuan, I left a few comments. > > > > > > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang > > mailto:boyu...@google.com> > > > <mailto:boyu...@google.com > > <mailto:boyu...@google.com>>> wrote: > > > > > > Hi team, > > > > > > I'm looking at adding self-checkpoint support to > > portable Flink > > > runner(BEAM-10940 > > > <https://issues.apache.org/jira/browse/BEAM-10940 > > <https://issues.apache.org/jira/browse/BEAM-10940>>) for > > both batch > > > and streaming. I summarized the problem that we > > want to solve and > > > proposed 2 potential approaches in this doc > > > > > < > https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing > < > https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing > >>. > > > > > > I want to collect feedback on which approach is > > preferred and > > > anything that I have not taken into consideration > > yet but I should. > > > Many thanks to all your help! > > > > > > Boyuan > > > > > >
Re: Self-checkpoint Support on Portable Flink
There would be ways around the lack of checkpointing in cycles, e.g. buffer and backloop only after checkpointing is complete, similarly how we implement @RequiresStableInput in the Flink Runner. -Max On 07.10.20 04:05, Reuven Lax wrote: It appears that there's a proposal (https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance <https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance>) and an abandoned PR to fix this, but AFAICT this remains a limitation of Flink. If Flink can't guarantee processing of records on back edges, I don't think we can use cycles, as we might otherwise lose the residuals. On Tue, Oct 6, 2020 at 6:16 PM Reuven Lax <mailto:re...@google.com>> wrote: This is what I was thinking of "Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing:|env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)|. Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure." On Tue, Oct 6, 2020 at 5:44 PM Boyuan Zhang mailto:boyu...@google.com>> wrote: Hi Reuven, As Luke mentioned, at least there are some limitations around tracking watermark with flink cycles. I'm going to use State + Timer without flink cycle to support self-checkpoint. For dynamic split, we can either explore flink cycle approach or limit depth approach. On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax mailto:re...@google.com>> wrote: Aren't there some limitations associated with flink cycles? I seem to remember various features that could not be used. I'm assuming that watermarks are not supported across cycles, but is there anything else? On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels mailto:m...@apache.org>> wrote: Thanks for starting the conversation. The two approaches both look good to me. Probably we want to start with approach #1 for all Runners to be able to support delaying bundles. Flink supports cycles and thus approach #2 would also be applicable and could be used to implement dynamic splitting. -Max On 05.10.20 23:13, Luke Cwik wrote: > Thanks Boyuan, I left a few comments. > > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang mailto:boyu...@google.com> > <mailto:boyu...@google.com <mailto:boyu...@google.com>>> wrote: > > Hi team, > > I'm looking at adding self-checkpoint support to portable Flink > runner(BEAM-10940 > <https://issues.apache.org/jira/browse/BEAM-10940 <https://issues.apache.org/jira/browse/BEAM-10940>>) for both batch > and streaming. I summarized the problem that we want to solve and > proposed 2 potential approaches in this doc > <https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing <https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing>>. > > I want to collect feedback on which approach is preferred and > anything that I have not taken into consideration yet but I should. > Many thanks to all your help! > > Boyuan >
Re: Self-checkpoint Support on Portable Flink
This is what I was thinking of "Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true). Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure." On Tue, Oct 6, 2020 at 5:44 PM Boyuan Zhang wrote: > Hi Reuven, > > As Luke mentioned, at least there are some limitations around tracking > watermark with flink cycles. I'm going to use State + Timer without flink > cycle to support self-checkpoint. For dynamic split, we can either explore > flink cycle approach or limit depth approach. > > On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax wrote: > >> Aren't there some limitations associated with flink cycles? I seem to >> remember various features that could not be used. I'm assuming that >> watermarks are not supported across cycles, but is there anything else? >> >> On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels wrote: >> >>> Thanks for starting the conversation. The two approaches both look good >>> to me. Probably we want to start with approach #1 for all Runners to be >>> able to support delaying bundles. Flink supports cycles and thus >>> approach #2 would also be applicable and could be used to implement >>> dynamic splitting. >>> >>> -Max >>> >>> On 05.10.20 23:13, Luke Cwik wrote: >>> > Thanks Boyuan, I left a few comments. >>> > >>> > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang >> > <mailto:boyu...@google.com>> wrote: >>> > >>> > Hi team, >>> > >>> > I'm looking at adding self-checkpoint support to portable Flink >>> > runner(BEAM-10940 >>> > <https://issues.apache.org/jira/browse/BEAM-10940>) for both batch >>> > and streaming. I summarized the problem that we want to solve and >>> > proposed 2 potential approaches in this doc >>> > < >>> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing >>> >. >>> > >>> > I want to collect feedback on which approach is preferred and >>> > anything that I have not taken into consideration yet but I should. >>> > Many thanks to all your help! >>> > >>> > Boyuan >>> > >>> >>
Re: Self-checkpoint Support on Portable Flink
Hi Reuven, As Luke mentioned, at least there are some limitations around tracking watermark with flink cycles. I'm going to use State + Timer without flink cycle to support self-checkpoint. For dynamic split, we can either explore flink cycle approach or limit depth approach. On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax wrote: > Aren't there some limitations associated with flink cycles? I seem to > remember various features that could not be used. I'm assuming that > watermarks are not supported across cycles, but is there anything else? > > On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels wrote: > >> Thanks for starting the conversation. The two approaches both look good >> to me. Probably we want to start with approach #1 for all Runners to be >> able to support delaying bundles. Flink supports cycles and thus >> approach #2 would also be applicable and could be used to implement >> dynamic splitting. >> >> -Max >> >> On 05.10.20 23:13, Luke Cwik wrote: >> > Thanks Boyuan, I left a few comments. >> > >> > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang > > <mailto:boyu...@google.com>> wrote: >> > >> > Hi team, >> > >> > I'm looking at adding self-checkpoint support to portable Flink >> > runner(BEAM-10940 >> > <https://issues.apache.org/jira/browse/BEAM-10940>) for both batch >> > and streaming. I summarized the problem that we want to solve and >> > proposed 2 potential approaches in this doc >> > < >> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing >> >. >> > >> > I want to collect feedback on which approach is preferred and >> > anything that I have not taken into consideration yet but I should. >> > Many thanks to all your help! >> > >> > Boyuan >> > >> >
Re: Self-checkpoint Support on Portable Flink
Thanks for starting the conversation. The two approaches both look good to me. Probably we want to start with approach #1 for all Runners to be able to support delaying bundles. Flink supports cycles and thus approach #2 would also be applicable and could be used to implement dynamic splitting. -Max On 05.10.20 23:13, Luke Cwik wrote: Thanks Boyuan, I left a few comments. On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang <mailto:boyu...@google.com>> wrote: Hi team, I'm looking at adding self-checkpoint support to portable Flink runner(BEAM-10940 <https://issues.apache.org/jira/browse/BEAM-10940>) for both batch and streaming. I summarized the problem that we want to solve and proposed 2 potential approaches in this doc <https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing>. I want to collect feedback on which approach is preferred and anything that I have not taken into consideration yet but I should. Many thanks to all your help! Boyuan
Re: Self-checkpoint Support on Portable Flink
Thanks Boyuan, I left a few comments. On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang wrote: > Hi team, > > I'm looking at adding self-checkpoint support to portable Flink runner( > BEAM-10940 <https://issues.apache.org/jira/browse/BEAM-10940>) for both > batch and streaming. I summarized the problem that we want to solve and > proposed 2 potential approaches in this doc > <https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing> > . > I want to collect feedback on which approach is preferred and anything > that I have not taken into consideration yet but I should. > Many thanks to all your help! > > Boyuan > >