Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-10 Thread Dawid Wysakowicz
+1 to what Arvid said. I am also thinking we could even consider dropping the dispose method straightaway to make the need for migration obvious. I'd make that decision during the implementation/on the PR though, once we verify if the deprecation option works. Best, Dawid On 10/06/2021 09:37,

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-10 Thread Arvid Heise
The whole operator API is only for advanced users and is not marked Public(Evolving). Users have to accept that things change and we have to use that freedom that we don't have in many other parts of the system. The change needs to be very clear in the change notes though. I also don't expect

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-10 Thread Yun Gao
Hi all, Very thanks for the warm discussions! Regarding the change in the operator lifecycle, I also agree with adding the flush/drain/stopAndFlush/finish method. For the duplication between this method and `endInput` for one input operator, with some offline disucssion with Dawid now I also

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Arvid Heise
Hi Piot, I'm fine with just doing it on the Sink. My responses were focused on the API (the how) not on the concept (the if). Just keep the methods on the different places in sync, such that it is easy to introduce a common interface later. Re name: drain is not a reinvention as it's used quite

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Piotr Nowojski
Hi, Arvid: What's the problem with providing `void flush()`/`void drain()` only in the `SinkFunction`? It would avoid the problem of typing. Why would one need to have it in the other `Rich***Function`s? For `flush()` to make sense, the entity which has this method, would need to buffer some

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Arvid Heise
Hi Dawid, I see your point. I'd probably add drain only to Rich*Function where we have the type bounds. Then we still need your Flushable interface in Rich*Function<..., T> to call it efficiently but we at least avoid weird type combinations. I'll have a rethink later. The proper solution is

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Dawid Wysakowicz
Hey, @Arvid The problem with adding the "drain/flush/stopProcessing" method to RichFunction is that it is not typed with the output type. At the same time we would most likely need a way to emit records from the method. That's originally thought about adding a typed interface which honestly I

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Arvid Heise
I have not followed the complete discussion and can't comment on the concepts. However, I have some ideas on the API changes: 1. If it's about adding additional life-cycle methods to UDFs, we should add the flush/endOfInput to RichFunction as this is the current way to define it. At this point, I

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Till Rohrmann
Thanks for the lively discussion everyone. I have to admit that I am not really convinced that we should call the interface Flushable and the method flush. The problem is that this method should in the first place tell the operator that it should stop processing and flush all buffered data. The

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-08 Thread Piotr Nowojski
Hi, Thanks for resuming this discussion. I think +1 for the proposal of dropping (deprecating) `dispose()`, and adding `flush()` to the `StreamOperator`/udfs. Semantically it would be more like new `close()` is an equivalent of old `dispose()`. Old `close()` is an equivalent of new `flush() +

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-04 Thread Yun Gao
Hi all, Very thanks @Dawid for resuming the discussion and very thanks @Till for the summary ! (and very sorry for I missed the mail and do not response in time...) I also agree with that we could consider the global commits latter separately after we have addressed the final checkpoints, and

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-03 Thread Dawid Wysakowicz
;> `notifyCheckpointComplete()`. This would be more difficult to implement, >> hence I would prefer "undefined" behaviour here, but it's probably possible. >> >> Very thanks for the further explanation, and I also totally agree with >> that the two are separate and

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-29 Thread Till Rohrmann
ould still tend to we support the > ordered case, since the sinks' implementation seem to depend > on this functionality. > > Best, > Yun > > -- > From:Piotr Nowojski > Send Time:2021 Mar. 4 (Thu.) 22:56 &g

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-04 Thread Yun Gao
From:Piotr Nowojski Send Time:2021 Mar. 4 (Thu.) 22:56 To:Kezhu Wang Cc:Till Rohrmann ; Guowei Ma ; dev ; Yun Gao ; jingsongl...@gmail.com Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Yun and Kezhu, > 1. We might introduce a new

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-04 Thread Piotr Nowojski
hTask or network io stack. >> > > * Or introducing stream task level `EndOfUserRecordsEvent`(from >> PR#14831 >> > > @Yun @Piotr) >> > > * Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`. >> > >> > I also have similar concern

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-04 Thread Kezhu Wang
putGate/InputChannel would be released after the > > downstream tasks have received > > EndOfPartitionEvent from all the input channels, this would makes the > > following checkpoint unable to > > perform since we could not emit barriers to downstream tasks ? > > > &

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-04 Thread Yun Gao
i Send Time:2021 Mar. 4 (Thu.) 17:16 To:Kezhu Wang Cc:dev ; Yun Gao ; jingsongl...@gmail.com ; Guowei Ma ; Till Rohrmann Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Kezhu, What do you mean by “end-flushing”? I was suggesting to just keep `endOfInput()

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-04 Thread Piotr Nowojski
titionEvent is currently emitted in Task instead of > > StreamTask, we would need some > > refactors here. > > 2. Currently the InputGate/InputChannel would be released after the > > downstream tasks have received > > EndOfPartitionEvent from all the input channels, this would makes the > > following checkpoint unable to > > perform sin

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-03 Thread Kezhu Wang
------------ > From:Kezhu Wang > Send Time:2021 Mar. 1 (Mon.) 01:26 > To:Till Rohrmann > Cc:Piotr Nowojski ; Guowei Ma < > guowei@gmail.com>; dev ; Yun Gao < > yungao...@aliyun.com>; jingsongl...@gmail.com > Subject:Re: Re: Re: [DISCUSS] FLIP-147: Supp

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-02 Thread Piotr Nowojski
> > Best, > Yun > > > ---------- > From:Kezhu Wang > Send Time:2021 Mar. 1 (Mon.) 01:26 > To:Till Rohrmann > Cc:Piotr Nowojski ; Guowei Ma < > guowei@gmail.com>; dev ; Yun Gao < > yungao...@aliy

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Yun Gao
he close() > happens > > at last, but it seems close() might also emit records ( > > so now the operator are closed with op1's close() -> op2's endOfInput() > -> > > op2's close() -> op3's endOfinput -> ...) ? > > > > And on the other side, as Kezhu has also

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Yun Gao
From:Kezhu Wang Send Time:2021 Mar. 1 (Mon.) 01:26 To:Till Rohrmann Cc:Piotr Nowojski ; Guowei Ma ; dev ; Yun Gao ; jingsongl...@gmail.com Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished In “stop-with-savepoint —drain”, MAX_WATERMARK is not an issue.

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Kezhu Wang
might also emit records ( > > so now the operator are closed with op1's close() -> op2's endOfInput() > -> > > op2's close() -> op3's endOfinput -> ...) ? > > > > And on the other side, as Kezhu has also proposed, perhapse we might > have > > the stop-with-save

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Till Rohrmann
osal the close() > happens > > at last, but it seems close() might also emit records ( > > so now the operator are closed with op1's close() -> op2's endOfInput() > -> > > op2's close() -> op3's endOfinput -> ...) ? > > > > And on the other side, a

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Kezhu Wang
he operator would not > need to wait for other slow operators to exit. > > Best, > Yun > > > > --Original Mail -- > *Sender:*Kezhu Wang > *Send Date:*Thu Feb 25 15:11:53 2021 > *Recipients:*Flink Dev , Piotr Nowojski < > piotr.nowoj

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-27 Thread Till Rohrmann
oint --drain could > be done with one savepoint, and for the normal exit, the operator would not > need to wait for other slow operators to exit. > > Best, > Yun > > > > --Original Mail -- > *Sender:*Kezhu Wang > *Send Date:*Thu Feb

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-25 Thread Kezhu Wang
low operators to exit. Best, Yun --Original Mail -- *Sender:*Kezhu Wang *Send Date:*Thu Feb 25 15:11:53 2021 *Recipients:*Flink Dev , Piotr Nowojski < piotr.nowoj...@gmail.com> *CC:*Guowei Ma , jingsongl...@gmail.com < jingsongl...@gmail.com> *Subject:*Re

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-25 Thread Yun Gao
ski CC:Guowei Ma , jingsongl...@gmail.com Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi all, thanks for driving this and especially Piotr for re-active this thread. First, for `notifyCheckpointComplete`, I have strong preference towards "shut down the dat

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-24 Thread Kezhu Wang
Hi all, thanks for driving this and especially Piotr for re-active this thread. First, for `notifyCheckpointComplete`, I have strong preference towards "shut down the dataflow pipeline with one checkpoint in total", so I tend to option dropping "send records" from `notifyCheckpointComplete` for

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-24 Thread Piotr Nowojski
Thanks for the reponses Guowei and Yun, Could you elaborate more/remind me, what does it mean to replace emitting results from the `notifyCheckpointComplete` with `OperatorCoordinator` approach? About the discussion in FLINK-21133 and how it relates to FLIP-147. You are right Yun gao, that in

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-24 Thread Yun Gao
Hi Till, Guowei, Very thanks for initiating the disucssion and the deep thoughts! For the notifyCheckpointComplete, I also agree we could try to avoid emitting new records in notifyCheckpointComplete via using OperatorCoordinator for new sink API. Besides, the hive sink might also need some

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-24 Thread Guowei Ma
Hi, Till Thank you very much for your careful consideration *1. Emit records in `NotifyCheckpointComplete`.* Sorry for making you misunderstanding because of my expression. I just want to say the current interface does not prevent users from doing it. >From the perspective of the new sink

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-23 Thread Till Rohrmann
Thanks for the explanation Yun and Guowei. I have to admit that I do not fully understand why this is strictly required but I think that we are touching two very important aspects which might have far fetching consequences for how Flink works: 1) Do we want to allow that multiple checkpoints are

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-16 Thread Guowei Ma
Thanks Yun for the detailed explanation. A simple supplementary explanation about the sink case: Maybe we could use `OperatorCoordinator` to avoid sending the element to the downstream operator. But I agree we could not limit the users not to emit records in the `notiyCheckpointComplete`. Best,

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-15 Thread Yun Gao
Hi all, I'd like to first detail the issue with emitting records in notifyCheckpointComplete for context. For specific usage, an example would be for sink, it might want to write some metadata after all the transactions are committed (like write a marker file _SUCCESS to the output

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-12 Thread Arvid Heise
Hi Piotr, Thank you for raising your concern. Unfortunately, I do not have a better idea than doing closing of operators intermittently with checkpoints (= multiple last checkpoints). However, two ideas on how to improve the overall user experience: 1. If an operator is not relying on

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-12 Thread Piotr Nowojski
Hey, I would like to raise a concern about implementation of the final checkpoints taking into account operators/functions that are implementing two phase commit (2pc) protocol for exactly-once processing with some external state (kept outside of the Flink). Primarily exactly-once sinks. First

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-15 Thread Yun Gao
Hi Aljoscha, I think so since we seems to do not have other divergence and new objections now. I'll open the vote then. Very thanks! Best, Yun -- From:Aljoscha Krettek Send Time:2021 Jan. 15 (Fri.) 21:24 To:dev Subject:Re:

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-15 Thread Aljoscha Krettek
Thanks for the summary! I think we can now move towards a [VOTE] thread, right? On 2021/01/15 13:43, Yun Gao wrote: 1) For the problem that the "new" root task coincidently finished before getting triggered successfully, we have listed two options in the FLIP-147[1], for the first version,

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-14 Thread Yun Gao
ksFinished-TriggeringCheckpointsAfterTasksFinished -- From:Yun Gao Send Time:2021 Jan. 13 (Wed.) 16:09 To:dev ; user Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi all, I updated the FLIP[1] to reflect the major

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-13 Thread Yun Gao
foreFinish -- From:Yun Gao Send Time:2021 Jan. 12 (Tue.) 10:30 To:Khachatryan Roman Cc:dev ; user Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Roman, Very thanks for the

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
r waiting for the flush of pipeline result partition. Glad that we have the same viewpoints on this issue. :) Best, Yun -- From:Khachatryan Roman Send Time:2021 Jan. 11 (Mon.) 19:14 To:Yun Gao Cc:dev ; user Subject:

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Khachatryan Roman
imization and > postpone it to future versions ? > > Best, > Yun > > > > ---------- > From:Khachatryan Roman > Send Time:2021 Jan. 11 (Mon.) 05:46 > To:Yun Gao > Cc:Arvid Heise ; dev ; us

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
SCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks a lot for your answers Yun, > In detail, support we have a job with the graph A -> B -> C, support in one > checkpoint A has reported FINISHED, CheckpointCoordinator would > choose B as the new "source" t

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-10 Thread Khachatryan Roman
is called > with > EndOfPartition) and then taking snapshot for the input channels, as the > normal unaligned checkpoints does for the InputChannel side. Then > we would be able to ensure the finished tasks always have an empty state. > > I'll also optimize the FLIP to make it mo

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Yun Gao
Hi Roman, Very thanks for the feedbacks! I'll try to answer the issues inline: > 1. Option 1 is said to be not preferable because it wastes resources and adds > complexity (new event). > However, the resources would be wasted for a relatively short time until the > job finishes completely.

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Khachatryan Roman
Thanks for starting this discussion (and sorry for probably duplicated questions, I couldn't find them answered in FLIP or this thread). 1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). However, the resources would be wasted for a relatively

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Arvid Heise
> > We could introduce an interface, sth like `RequiresFinalization` or > `FinalizationListener` (all bad names). The operator itself knows when > it is ready to completely shut down, Async I/O would wait for all > requests, sink would potentially wait for a given number of checkpoints. > The

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Aljoscha Krettek
This is somewhat unrelated to the discussion about how to actually do the triggering when sources shut down, I'll write on that separately. I just wanted to get this quick thought out. For letting operators decide whether they actually want to wait for a final checkpoint, which is relevant at

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Yun Gao
Hi Arvid, Very thanks for the deep thoughts ! > If this somehow works, we would not need to change much in the checkpoint > coordinator. He would always inject into sources. We could also ignore the > race conditions as long as the TM lives. Checkpointing times are also not > worse as with the

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Arvid Heise
Okay then at least you guys are in sync ;) (Although I'm also not too far away) I hope I'm not super derailing but could we reiterate why it's good to get rid of finished tasks (note: I'm also mostly in favor of that): 1. We can free all acquired resources including buffer pools, state

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Aljoscha Krettek
On 2021/01/06 16:05, Arvid Heise wrote: thanks for the detailed example. It feels like Aljoscha and you are also not fully aligned yet. For me, it sounded as if Aljoscha would like to avoid sending RPC to non-source subtasks. No, I think we need the triggering of intermediate operators. I was

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Aljoscha Krettek
On 2021/01/06 13:35, Arvid Heise wrote: I was actually not thinking about concurrent checkpoints (and actually want to get rid of them once UC is established, since they are addressing the same thing). I would give a yuge +1 to that. I don't see why we would need concurrent checkpoints in

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Arvid Heise
s. > Of couse this is one possible implementation and we might have other > solutions to this problem. Do you think the process would still have some > problems ? > > > However, that would > > require subtasks to stay alive until they receive checkpiontCompleted > > callback (which is

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Yun Gao
Hi Arvid, Very thanks for the feedbacks! I'll try to answer the questions inline: > I'm also concerned about the notion of a final checkpoint. What happens > when this final checkpoint times out (checkpoint timeout > async timeout) > or fails for a different reason? I'm currently more inclined

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Arvid Heise
I was actually not thinking about concurrent checkpoints (and actually want to get rid of them once UC is established, since they are addressing the same thing). But your explanation definitely helped me to better understand the race condition. However, I have the impression that you think

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Aljoscha Krettek
On 2021/01/06 11:30, Arvid Heise wrote: I'm assuming that this is the normal case. In a A->B graph, as soon as A finishes, B still has a couple of input buffers to process. If you add backpressure or longer pipelines into the mix, it's quite likely that a checkpoint may occur with B being the

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Arvid Heise
> > I was referring to the case where intermediate operators don't have any > active upstream (input) operators. In that case, they basically become > the "source" of that part of the graph. In your example, M1 is still > connected to a "real" source. I'm assuming that this is the normal case.

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Aljoscha Krettek
On 2021/01/05 17:27, Arvid Heise wrote: For your question: will there ever be intermediate operators that should be running that are not connected to at least once source? I think there are plenty of examples if you go beyond chained operators and fully connected exchanges. Think of any fan-in,

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Yun Gao
Hi Arvid, Very thanks for the feedbacks! > For 2) the race condition, I was more thinking of still injecting the > barrier at the source in all cases, but having some kind of short-cut to > immediately execute the RPC inside the respective taskmanager.

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Arvid Heise
For 2) the race condition, I was more thinking of still injecting the barrier at the source in all cases, but having some kind of short-cut to immediately execute the RPC inside the respective taskmanager. However, that may prove hard in case of dynamic scale-ins. Nevertheless, because of this

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Yun Gao
Hi Aljoscha, Very thanks for the feedbacks! For the second issue, I'm indeed thinking the race condition between deciding to trigger and operator get finished. And for this point, > One thought here is this: will there ever be intermediate operators that > should be

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Yun Gao
Hi Avrid, Very thanks for the feedbacks! For the second issue, sorry I think I might not make it very clear, I'm initially thinking the case that for example for a job with graph A -> B -> C, when we compute which tasks to trigger, A is still running, so we trigger A

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Aljoscha Krettek
On 2021/01/05 10:16, Arvid Heise wrote: 1. I'd think that this is an orthogonal issue, which I'd solve separately. My gut feeling says that this is something we should only address for new sinks where we decouple the semantics of commits and checkpoints anyways. @Aljoscha Krettek any idea on

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Arvid Heise
Hi Yun, 1. I'd think that this is an orthogonal issue, which I'd solve separately. My gut feeling says that this is something we should only address for new sinks where we decouple the semantics of commits and checkpoints anyways. @Aljoscha Krettek any idea on this one? 2. I'm not sure I get it

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-24 Thread Yun Gao
Hi all, I tested the previous PoC with the current tests and I found some new issues that might cause divergence, and sorry for there might also be some reversal for some previous problems: 1. Which operators should wait for one more checkpoint before close ? One

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-15 Thread Yun Gao
Hi Aljoscha, Very thanks for the feedbacks! For the remaining issues: > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes. Yes, exactly,

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-15 Thread Aljoscha Krettek
Thanks for the thorough update! I'll answer inline. On 14.12.20 16:33, Yun Gao wrote: 1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-14 Thread Yun Gao
Hi all, I would like to resume this discussion for supporting checkpoints after tasks Finished :) Based on the previous discussion, we now implement a version of PoC [1] to try the idea. During the PoC we also met with some possible issues: 1. To include EndOfPartition into

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-14 Thread Yun Gao
Hi Till, Very thanks for the feedbacks ! > 1) When restarting all tasks independent of the status at checkpoint time > (finished, running, scheduled), we might allocate more resources than we > actually need to run the remaining job. From a scheduling perspective it > would be easier if we

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-13 Thread Till Rohrmann
Thanks for starting this discussion Yun Gao, I have three comments/questions: 1) When restarting all tasks independent of the status at checkpoint time (finished, running, scheduled), we might allocate more resources than we actually need to run the remaining job. From a scheduling perspective

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-13 Thread Yun Gao
Hi Arvid, Very thanks for the comments! >>> 4) Yes, the interaction is not trivial and also I have not completely >>> thought it through. But in general, I'm currently at the point where I >>> think that we also need non-checkpoint related events in unaligned >>> checkpoints. So just keep that

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Arvid Heise
Hi Yun, 4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Yun Gao
Hi Arvid, Very thanks for the insightful comments! I added the responses for this issue under the quota: >> 1) You call the tasks that get the barriers injected leaf nodes, which would >> make the > sinks the root nodes. That is very similar to how graphs in >> relational algebra are labeled.

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Arvid Heise
Hi Yun, Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again. A couple of comments: 1) You call the tasks that get the barriers injected leaf nodes, which

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-09 Thread Yun Gao
Hi, devs & users Very sorry for the spoiled formats, I resent the discussion as follows. As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming