Re: Self-checkpoint Support on Portable Flink

2020-10-23 Thread Boyuan Zhang
t; >  > to me. Probably we want to start with approach #1
> for
> >  > all Runners to be
> >  > able to support delaying bundles. Flink supports
> > cycles
> >  > and thus
> >  > approach #2 would also be applicable and could be
> > used
> >  > to implement
> >  > dynamic splitting.
> >  >
> >  > -Max
> >  >
> >  > On 05.10.20 23:13, Luke Cwik wrote:
> >  >  > Thanks Boyuan, I left a few comments.
> >  >  >
> >  >  > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang
> >  > mailto:boyu...@google.com>
> > <mailto:boyu...@google.com <mailto:boyu...@google.com>>
> >  >  > <mailto:boyu...@google.com
> > <mailto:boyu...@google.com>
> >  > <mailto:boyu...@google.com
> > <mailto:boyu...@google.com>>>> wrote:
> >  >  >
> >  >  > Hi team,
> >  >  >
> >  >  > I'm looking at adding self-checkpoint
> > support to
> >  > portable Flink
> >  >  > runner(BEAM-10940
> >  >  >
> >   <https://issues.apache.org/jira/browse/BEAM-10940
> > <https://issues.apache.org/jira/browse/BEAM-10940>
> >  > <https://issues.apache.org/jira/browse/BEAM-10940
> > <https://issues.apache.org/jira/browse/BEAM-10940>>>) for
> >  > both batch
> >  >  > and streaming. I summarized the problem
> > that we
> >  > want to solve and
> >  >  > proposed 2 potential approaches in this doc
> >  >  >
> >  >
> >   <
> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing
> <
> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing>
> <
> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing
> <
> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing
> >>>.
> >  >  >
> >  >  > I want to collect feedback on which
> > approach is
> >  > preferred and
> >  >  > anything that I have not taken into
> > consideration
> >  > yet but I should.
> >  >  > Many thanks to all your help!
> >  >  >
> >  >  > Boyuan
> >  >  >
> >  >
> >
>


Re: Self-checkpoint Support on Portable Flink

2020-10-14 Thread Maximilian Michels
Duplicates cannot happen because the state of all operators will be 
rolled back to the latest checkpoint, in case of failures.


On 14.10.20 06:31, Reuven Lax wrote:
Does this mean that we have to deal with duplicate messages over the 
back edge? Or will that not happen, since duplicates mean that we rolled 
back a checkpoint.


On Tue, Oct 13, 2020 at 2:59 AM Maximilian Michels <mailto:m...@apache.org>> wrote:


There would be ways around the lack of checkpointing in cycles, e.g.
buffer and backloop only after checkpointing is complete, similarly how
we implement @RequiresStableInput in the Flink Runner.

-Max

On 07.10.20 04:05, Reuven Lax wrote:
 > It appears that there's a proposal
 >

(https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance

<https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance>

 >

<https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance

<https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance>>)

 > and an abandoned PR to fix this, but AFAICT this remains a
limitation of
 > Flink. If Flink can't guarantee processing of records on back
edges, I
 > don't think we can use cycles, as we might otherwise lose the
residuals.
 >
 > On Tue, Oct 6, 2020 at 6:16 PM Reuven Lax mailto:re...@google.com>
 > <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
 >
 >     This is what I was thinking of
 >
 >     "Flink currently only provides processing guarantees for jobs
 >     without iterations. Enabling checkpointing on an iterative job
 >     causes an exception. In order to force checkpointing on an
iterative
 >     program the user needs to set a special flag when enabling
 >     checkpointing:|env.enableCheckpointing(interval,
 >     CheckpointingMode.EXACTLY_ONCE, force = true)|.
 >
 >     Please note that records in flight in the loop edges (and the
state
 >     changes associated with them) will be lost during failure."
 >
 >
 >
 >
 >
 >
 >     On Tue, Oct 6, 2020 at 5:44 PM Boyuan Zhang
mailto:boyu...@google.com>
 >     <mailto:boyu...@google.com <mailto:boyu...@google.com>>> wrote:
 >
 >         Hi Reuven,
 >
 >         As Luke mentioned, at least there are some limitations around
 >         tracking watermark with flink cycles. I'm going to use
State +
 >         Timer without flink cycle to support self-checkpoint. For
 >         dynamic split, we can either explore flink cycle approach or
 >         limit depth approach.
 >
 >         On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax
mailto:re...@google.com>
 >         <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
 >
 >             Aren't there some limitations associated with flink
cycles?
 >             I seem to remember various features that could not be
used.
 >             I'm assuming that watermarks are not supported across
 >             cycles, but is there anything else?
 >
 >             On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels
 >             mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
 >
 >                 Thanks for starting the conversation. The two
approaches
 >                 both look good
 >                 to me. Probably we want to start with approach #1 for
 >                 all Runners to be
 >                 able to support delaying bundles. Flink supports
cycles
 >                 and thus
 >                 approach #2 would also be applicable and could be
used
 >                 to implement
 >                 dynamic splitting.
 >
 >                 -Max
 >
 >                 On 05.10.20 23:13, Luke Cwik wrote:
 >                  > Thanks Boyuan, I left a few comments.
 >                  >
 >                  > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang
 >                 mailto:boyu...@google.com>
<mailto:boyu...@google.com <mailto:boyu...@google.com>>
 >                  > <mailto:boyu...@google.com
<mailto:boyu...@google.com>
 >                 <mailto:boyu...@google.com
<mailto:boyu...@google.com>>>> wrote:
 >                  >
 >                  >     Hi team,
 >                  >
 >                  >     I'm looking at 

Re: Self-checkpoint Support on Portable Flink

2020-10-13 Thread Reuven Lax
Does this mean that we have to deal with duplicate messages over the back
edge? Or will that not happen, since duplicates mean that we rolled back a
checkpoint.

On Tue, Oct 13, 2020 at 2:59 AM Maximilian Michels  wrote:

> There would be ways around the lack of checkpointing in cycles, e.g.
> buffer and backloop only after checkpointing is complete, similarly how
> we implement @RequiresStableInput in the Flink Runner.
>
> -Max
>
> On 07.10.20 04:05, Reuven Lax wrote:
> > It appears that there's a proposal
> > (
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance>)
>
> > and an abandoned PR to fix this, but AFAICT this remains a limitation of
> > Flink. If Flink can't guarantee processing of records on back edges, I
> > don't think we can use cycles, as we might otherwise lose the residuals.
> >
> > On Tue, Oct 6, 2020 at 6:16 PM Reuven Lax  > <mailto:re...@google.com>> wrote:
> >
> > This is what I was thinking of
> >
> > "Flink currently only provides processing guarantees for jobs
> > without iterations. Enabling checkpointing on an iterative job
> > causes an exception. In order to force checkpointing on an iterative
> > program the user needs to set a special flag when enabling
> > checkpointing:|env.enableCheckpointing(interval,
> > CheckpointingMode.EXACTLY_ONCE, force = true)|.
> >
> > Please note that records in flight in the loop edges (and the state
> > changes associated with them) will be lost during failure."
> >
> >
> >
> >
> >
> >
> > On Tue, Oct 6, 2020 at 5:44 PM Boyuan Zhang  > <mailto:boyu...@google.com>> wrote:
> >
> > Hi Reuven,
> >
> > As Luke mentioned, at least there are some limitations around
> > tracking watermark with flink cycles. I'm going to use State +
> > Timer without flink cycle to support self-checkpoint. For
> > dynamic split, we can either explore flink cycle approach or
> > limit depth approach.
> >
> > On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax  > <mailto:re...@google.com>> wrote:
> >
> > Aren't there some limitations associated with flink cycles?
> > I seem to remember various features that could not be used.
> > I'm assuming that watermarks are not supported across
> > cycles, but is there anything else?
> >
> > On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels
> > mailto:m...@apache.org>> wrote:
> >
> > Thanks for starting the conversation. The two approaches
> > both look good
> > to me. Probably we want to start with approach #1 for
> > all Runners to be
> > able to support delaying bundles. Flink supports cycles
> > and thus
> > approach #2 would also be applicable and could be used
> > to implement
> > dynamic splitting.
> >
> > -Max
> >
> > On 05.10.20 23:13, Luke Cwik wrote:
> >      > Thanks Boyuan, I left a few comments.
> >  >
> >  > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang
> > mailto:boyu...@google.com>
> >  > <mailto:boyu...@google.com
> > <mailto:boyu...@google.com>>> wrote:
> >  >
> >  > Hi team,
> >  >
> >  > I'm looking at adding self-checkpoint support to
> > portable Flink
> >  > runner(BEAM-10940
> >  > <https://issues.apache.org/jira/browse/BEAM-10940
> > <https://issues.apache.org/jira/browse/BEAM-10940>>) for
> > both batch
> >  > and streaming. I summarized the problem that we
> > want to solve and
> >  > proposed 2 potential approaches in this doc
> >  >
> >   <
> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing
> <
> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing
> >>.
> >  >
> >  > I want to collect feedback on which approach is
> > preferred and
> >  > anything that I have not taken into consideration
> > yet but I should.
> >  > Many thanks to all your help!
> >  >
> >  > Boyuan
> >  >
> >
>


Re: Self-checkpoint Support on Portable Flink

2020-10-13 Thread Maximilian Michels
There would be ways around the lack of checkpointing in cycles, e.g. 
buffer and backloop only after checkpointing is complete, similarly how 
we implement @RequiresStableInput in the Flink Runner.


-Max

On 07.10.20 04:05, Reuven Lax wrote:
It appears that there's a proposal 
(https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance 
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance>) 
and an abandoned PR to fix this, but AFAICT this remains a limitation of 
Flink. If Flink can't guarantee processing of records on back edges, I 
don't think we can use cycles, as we might otherwise lose the residuals.


On Tue, Oct 6, 2020 at 6:16 PM Reuven Lax <mailto:re...@google.com>> wrote:


This is what I was thinking of

"Flink currently only provides processing guarantees for jobs
without iterations. Enabling checkpointing on an iterative job
causes an exception. In order to force checkpointing on an iterative
program the user needs to set a special flag when enabling
checkpointing:|env.enableCheckpointing(interval,
CheckpointingMode.EXACTLY_ONCE, force = true)|.

Please note that records in flight in the loop edges (and the state
changes associated with them) will be lost during failure."






On Tue, Oct 6, 2020 at 5:44 PM Boyuan Zhang mailto:boyu...@google.com>> wrote:

Hi Reuven,

As Luke mentioned, at least there are some limitations around
tracking watermark with flink cycles. I'm going to use State +
Timer without flink cycle to support self-checkpoint. For
dynamic split, we can either explore flink cycle approach or
limit depth approach.

On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax mailto:re...@google.com>> wrote:

Aren't there some limitations associated with flink cycles?
I seem to remember various features that could not be used.
I'm assuming that watermarks are not supported across
cycles, but is there anything else?

On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels
mailto:m...@apache.org>> wrote:

Thanks for starting the conversation. The two approaches
both look good
to me. Probably we want to start with approach #1 for
all Runners to be
able to support delaying bundles. Flink supports cycles
and thus
approach #2 would also be applicable and could be used
to implement
dynamic splitting.

-Max

On 05.10.20 23:13, Luke Cwik wrote:
 > Thanks Boyuan, I left a few comments.
 >
 > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang
mailto:boyu...@google.com>
 > <mailto:boyu...@google.com
<mailto:boyu...@google.com>>> wrote:
 >
 >     Hi team,
     >
     >     I'm looking at adding self-checkpoint support to
portable Flink
 >     runner(BEAM-10940
 >     <https://issues.apache.org/jira/browse/BEAM-10940
<https://issues.apache.org/jira/browse/BEAM-10940>>) for
both batch
 >     and streaming. I summarized the problem that we
want to solve and
 >     proposed 2 potential approaches in this doc
 >   
  <https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing <https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing>>.

 >
 >     I want to collect feedback on which approach is
preferred and
 >     anything that I have not taken into consideration
yet but I should.
 >     Many thanks to all your help!
 >
 >     Boyuan
 >



Re: Self-checkpoint Support on Portable Flink

2020-10-06 Thread Reuven Lax
This is what I was thinking of

"Flink currently only provides processing guarantees for jobs without
iterations. Enabling checkpointing on an iterative job causes an exception.
In order to force checkpointing on an iterative program the user needs to
set a special flag when enabling checkpointing:
env.enableCheckpointing(interval,
CheckpointingMode.EXACTLY_ONCE, force = true).

Please note that records in flight in the loop edges (and the state changes
associated with them) will be lost during failure."






On Tue, Oct 6, 2020 at 5:44 PM Boyuan Zhang  wrote:

> Hi Reuven,
>
> As Luke mentioned, at least there are some limitations around tracking
> watermark with flink cycles. I'm going to use State + Timer without flink
> cycle to support self-checkpoint. For dynamic split, we can either explore
> flink cycle approach or limit depth approach.
>
> On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax  wrote:
>
>> Aren't there some limitations associated with flink cycles? I seem to
>> remember various features that could not be used. I'm assuming that
>> watermarks are not supported across cycles, but is there anything else?
>>
>> On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels  wrote:
>>
>>> Thanks for starting the conversation. The two approaches both look good
>>> to me. Probably we want to start with approach #1 for all Runners to be
>>> able to support delaying bundles. Flink supports cycles and thus
>>> approach #2 would also be applicable and could be used to implement
>>> dynamic splitting.
>>>
>>> -Max
>>>
>>> On 05.10.20 23:13, Luke Cwik wrote:
>>> > Thanks Boyuan, I left a few comments.
>>> >
>>> > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang >> > <mailto:boyu...@google.com>> wrote:
>>> >
>>> > Hi team,
>>> >
>>> > I'm looking at adding self-checkpoint support to portable Flink
>>> > runner(BEAM-10940
>>> > <https://issues.apache.org/jira/browse/BEAM-10940>) for both batch
>>> > and streaming. I summarized the problem that we want to solve and
>>> > proposed 2 potential approaches in this doc
>>> > <
>>> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing
>>> >.
>>> >
>>> > I want to collect feedback on which approach is preferred and
>>> > anything that I have not taken into consideration yet but I should.
>>> > Many thanks to all your help!
>>> >
>>> > Boyuan
>>> >
>>>
>>


Re: Self-checkpoint Support on Portable Flink

2020-10-06 Thread Boyuan Zhang
Hi Reuven,

As Luke mentioned, at least there are some limitations around tracking
watermark with flink cycles. I'm going to use State + Timer without flink
cycle to support self-checkpoint. For dynamic split, we can either explore
flink cycle approach or limit depth approach.

On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax  wrote:

> Aren't there some limitations associated with flink cycles? I seem to
> remember various features that could not be used. I'm assuming that
> watermarks are not supported across cycles, but is there anything else?
>
> On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels  wrote:
>
>> Thanks for starting the conversation. The two approaches both look good
>> to me. Probably we want to start with approach #1 for all Runners to be
>> able to support delaying bundles. Flink supports cycles and thus
>> approach #2 would also be applicable and could be used to implement
>> dynamic splitting.
>>
>> -Max
>>
>> On 05.10.20 23:13, Luke Cwik wrote:
>> > Thanks Boyuan, I left a few comments.
>> >
>> > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang > > <mailto:boyu...@google.com>> wrote:
>> >
>> > Hi team,
>> >
>> > I'm looking at adding self-checkpoint support to portable Flink
>> > runner(BEAM-10940
>> > <https://issues.apache.org/jira/browse/BEAM-10940>) for both batch
>> > and streaming. I summarized the problem that we want to solve and
>> > proposed 2 potential approaches in this doc
>> > <
>> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing
>> >.
>> >
>> > I want to collect feedback on which approach is preferred and
>> > anything that I have not taken into consideration yet but I should.
>> > Many thanks to all your help!
>> >
>> > Boyuan
>> >
>>
>


Re: Self-checkpoint Support on Portable Flink

2020-10-06 Thread Maximilian Michels
Thanks for starting the conversation. The two approaches both look good 
to me. Probably we want to start with approach #1 for all Runners to be 
able to support delaying bundles. Flink supports cycles and thus 
approach #2 would also be applicable and could be used to implement 
dynamic splitting.


-Max

On 05.10.20 23:13, Luke Cwik wrote:

Thanks Boyuan, I left a few comments.

On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang <mailto:boyu...@google.com>> wrote:


Hi team,

I'm looking at adding self-checkpoint support to portable Flink
runner(BEAM-10940
<https://issues.apache.org/jira/browse/BEAM-10940>) for both batch
and streaming. I summarized the problem that we want to solve and
proposed 2 potential approaches in this doc

<https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing>.

I want to collect feedback on which approach is preferred and
anything that I have not taken into consideration yet but I should.
Many thanks to all your help!

Boyuan



Re: Self-checkpoint Support on Portable Flink

2020-10-05 Thread Luke Cwik
Thanks Boyuan, I left a few comments.

On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang  wrote:

> Hi team,
>
> I'm looking at adding self-checkpoint support to portable Flink runner(
> BEAM-10940 <https://issues.apache.org/jira/browse/BEAM-10940>) for both
> batch and streaming. I summarized the problem that we want to solve and
> proposed 2 potential approaches in this doc
> <https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing>
> .
> I want to collect feedback on which approach is preferred and anything
> that I have not taken into consideration yet but I should.
> Many thanks to all your help!
>
> Boyuan
>
>