Hi Yun,

Thank you for starting the discussion. This will solve one of the
long-standing issues [1] that confuse users. I'm also a big fan of option
3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which
would make the sinks the root nodes. That is very similar to how graphs in
relational algebra are labeled. However, I got the feeling that in Flink,
we rather iterate from sources to sink, making the sources root nodes and
the sinks the leaf nodes. However, I have no clue how it's done in similar
cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in
CheckpointCoordinator. Let's assume that we inject the barrier at all root
subtasks (initially all sources). So in the iterative algorithm, whenever
root A finishes, it looks at all connected subtasks B if they have any
upstream task left. If not B becomes a new root. That would require to only
touch a part of the job graph, but would require some callback from
JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is
about to finish, we could send the barrier to it from
CheckpointCoordinator, but at the time it arrives, the subtask is finished
already.
3) An implied change is that checkpoints are not aborted anymore at
EndOfPartition,
which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit
ambiguous: What happens when an unaligned checkpoint is started and then
one input channel contains the EndOfPartition event? From the written
description, it sounds to me like, we move back to an aligned checkpoint
for the whole receiving task. However, that is neither easily possible nor
necessary. Imho it would be enough to also store the EndOfPartition in the
channel state.
5) I'd expand the recovery section a bit. It would be the first time that
we recover an incomplete DAG. Afaik the subtasks are deployed before the
state is recovered, so at some point, the subtasks either need to be
removed again or maybe we could even avoid them being created in the first
place.

[1] https://issues.apache.org/jira/browse/FLINK-2491

On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <yungao...@aliyun.com> wrote:

> Hi, devs & users
>
> Very sorry for the spoiled formats, I resent the discussion as follows.
>
>
> As discussed in FLIP-131[1], Flink will make DataStream the unified API for 
> processing bounded and unbounded data in both streaming and blocking modes. 
> However, one long-standing problem for the streaming mode is that currently 
> Flink does not s
> ​
> upport checkpoints after some tasks finished, which causes some problems for 
> bounded or mixed jobs:
>         1.
> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
> before committed to external systems in streaming mode. If sources are 
> bounded and checkpoints are disabled after some tasks are finished, the data 
> sent after the last checkpoint would always not be able to be committed. This 
> issue has already been reported some times in the user ML[2][3][4] and is 
> future brought up when working on FLIP-143: Unified Sink API [5].
>         2.
> The jobs with both bounded and unbounded sources might have to replay a large 
> amount of records after failover due to no periodic checkpoints are taken 
> after the bounded sources finished.
>
>
> Therefore, we propose to also support checkpoints after some tasks finished. 
> Your Could find more details in FLIP-147[6].
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
> [4]
> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [6]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>
> ------------------Original Mail ------------------
> *Sender:*Yun Gao <yungao...@aliyun.com.INVALID>
> *Send Date:*Fri Oct 9 14:16:52 2020
> *Recipients:*Flink Dev <d...@flink.apache.org>, User-Flink <
> user@flink.apache.org>
> *Subject:*[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>> Hi, devs & users
>>
>>
>> As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
>> processing bounded and unbounded data in both streaming and blocking modes. 
>> However, one long-standing problem for the streaming mode is that currently 
>> Flink does not support checkpoints after some tasks finished, which causes 
>> some problems for bounded or mixed jobs:
>>
>> Flink exactly-once sinks rely on checkpoints to ensure data won’t be 
>> replayed before committed to external systems in streaming mode. If sources 
>> are bounded and checkpoints are disabled after some tasks are finished, the 
>> data sent after the last checkpoint would always not be able to be 
>> committed. This issue has already been reported some times in the user 
>> ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink 
>> API [5].
>>
>> The jobs with both bounded and unbounded sources might have to replay a 
>> large amount of records after failover due to no periodic checkpoints are 
>> taken after the bounded sources finished.
>>
>> Therefore, we propose to also support checkpoints after some tasks finished. 
>> Your Could find more details in FLIP-147[6].
>> Best,
>> Yun
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> [2]
>> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
>> [3]
>> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
>> [4]
>> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
>> [5]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
>> [6]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to