Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-27 Thread Steven Wu
It is more about extended outages of metastore. E.g. If we commit every 2 minutes, 4 hours of metastore outage can lead to over 120 GlobalCommitT. And regarding metastore outages, it is undesirable for streaming jobs to fail the job and keep restarting. It is better to keep processing records

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-26 Thread Guowei Ma
Hi Steven Thank you very much for your detailed explanation. Now I got your point, I could see that there are benefits from committing a collection of `GlobalCommT` as a whole when the external metastore environment is unstable at some time. But I have two little concern about introducing

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-25 Thread Steven Wu
I should clarify my last email a little more. For the example of commits for checkpoints 1-100 failed, the job is still up (processing records and uploading files). When commit for checkpoint 101 came, IcebergSink would prefer the framework to pass in all 101 GlobalCommT (100 old + 1 new) so that

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-25 Thread Steven Wu
> 1. The frame can not know which `GlobalCommT` to retry if we use the > List as parameter when the `commit` returns `RETRY`. > 2. Of course we can let the `commit` return more detailed info but it might > be too complicated. If commit(List) returns RETRY, it means the whole list needs to be

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-25 Thread Guowei Ma
Hi, all >From the above discussion we could find that FLIP focuses on providing an unified transactional sink API. So I updated the FLIP's title to "Unified Transactional Sink API". But I found that the old link could not be opened again. I would update the link[1] here. Sorry for the

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-25 Thread Guowei Ma
Hi, Steven >>I also have a clarifying question regarding the WriterStateT. Since >>IcebergWriter won't need to checkpoint any state, should we set it to *Void* >>type? Since getWriterStateSerializer() returns Optional, that is clear and >>we can return Optional.empty(). Yes I think you could do

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-25 Thread Guowei Ma
Hi,Steven Thank you for reading the FLIP so carefully. 1. The frame can not know which `GlobalCommT` to retry if we use the List as parameter when the `commit` returns `RETRY`. 2. Of course we can let the `commit` return more detailed info but it might be too complicated. 3. On the other hand, I

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-24 Thread Steven Wu
Guowei, Thanks a lot for updating the wiki page. It looks great. I noticed one inconsistency in the wiki with your last summary email for GlobalCommitter interface. I think the version in the summary email is the intended one, because rollover from previous failed commits can accumulate a list.

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-23 Thread Guowei Ma
Thanks Aljoscha for your suggestion. I have updated FLIP. Any comments are welcome. Best, Guowei On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek wrote: > Yes, that sounds good! I'll probably have some comments on the FLIP > about the names of generic parameters and the Javadoc but we can

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-23 Thread Aljoscha Krettek
Yes, that sounds good! I'll probably have some comments on the FLIP about the names of generic parameters and the Javadoc but we can address them later or during implementation. I also think that we probably need the FAIL,RETRY,SUCCESS result for globalCommit() but we can also do that as a

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Guowei Ma
Hi, all Thank everyone very much for your ideas and suggestions. I would try to summarize again the consensus :). Correct me if I am wrong or misunderstand you. ## Consensus-1 1. The motivation of the unified sink API is to decouple the sink implementation from the different runtime execution

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Guowei Ma
>> I think we should go with something like >> List filterRecoveredCommittables(List<>) >> to keep things simple. This should also be easy to do from the framework >> side and then the sink doesn't need to do any custom state handling. I second Aljoscha's proposal. For the first version there

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Aljoscha Krettek
I think we should go with something like List filterRecoveredCommittables(List<>) to keep things simple. This should also be easy to do from the framework side and then the sink doesn't need to do any custom state handling. Best, Aljoscha On 22.09.20 16:03, Steven Wu wrote: Previous APIs

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Steven Wu
Previous APIs discussed have been trying to do more in the framework. If we take a different approach to a lighter framework, these sets of minimal APIs are probably good enough. Sink can handle the bookkeeping, merge, retry logics. /** * CommT is the DataFile in Iceberg * GlobalCommT is the

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Steven Wu
It is fine to leave the CommitResult/RETRY outside the scope of framework. Then the framework might need to provide some hooks in the checkpoint/restore logic. because the commit happened in the post checkpoint completion step, sink needs to update the internal state when the commit is successful

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Aljoscha Krettek
On 22.09.20 13:26, Guowei Ma wrote: Actually I am not sure adding `isAvailable` is enough. Maybe it is not. But for the initial version I hope we could make the sink api sync because there is already a lot of stuff that has to finish. :--) I agree, for the first version we should stick to a

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Guowei Ma
>> I believe that we could support such an async sink writer >> very easily in the future. What do you think? >> How would you see the expansion in the future? Do you mean just adding `isAvailable()` method with a default implementation later on? Hi @piotr Actually I am not sure adding

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Aljoscha Krettek
On 22.09.20 11:10, Guowei Ma wrote: 1. I think maybe we could add a EOI interface to the `GlobalCommit`. So that we could make `write success file` be available in both batch and stream execution mode. We could, yes. I'm now hesitant because we're adding more things but I think it should be

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Guowei Ma
Thanks @aljoscha summary. I agree we should postpone the discussion of the sink topology first and focus on the normal file sink and IcebergSink in the Flink 1.12. I have three little questions: 1. I think maybe we could add a EOI interface to the `GlobalCommit`. So that we could make `write

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Aljoscha Krettek
Ah sorry, I think I now see what you mean. I think it's ok to add a `List recoverCommittables(List)` method. On 22.09.20 09:42, Aljoscha Krettek wrote: On 22.09.20 06:06, Steven Wu wrote: In addition, it is undesirable to do the committed-or-not check in the commit method, which happens for

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Aljoscha Krettek
On 22.09.20 06:06, Steven Wu wrote: In addition, it is undesirable to do the committed-or-not check in the commit method, which happens for each checkpoint cycle. CommitResult already indicates SUCCESS or not. when framework calls commit with a list of GlobalCommittableT, it should be certain

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-21 Thread Steven Wu
Aljoscha/Guowei, I think we are pretty close with aligning on the Iceberg sink requirements. This new sink API can really benefit and simplify Iceberg sink implementation. Looking forward to the initial scope with 1.12 release. > CommitResult commit(GlobalCommittableT); I like the

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-21 Thread Aljoscha Krettek
Hi all, I'll try and summarize my thoughts after Guowei, Yun, Kostas, Dawid, and me had an offline discussion about this. Also, I would like to give credit to Guowei for initially coming up with the idea of a topology sink in the context of this discussion. I think it's a good idea and we

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-21 Thread Piotr Nowojski
Hi Guowei, > I believe that we could support such an async sink writer > very easily in the future. What do you think? How would you see the expansion in the future? Do you mean just adding `isAvailable()` method with a default implementation later on? Piotrek pon., 21 wrz 2020 o 02:39 Steven

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-20 Thread Steven Wu
> I think Iceberg sink needs to do the dedup in the `commit` call. The `recoveredGlobalCommittables` is just for restoring the ids. @Guowei Ma It is undesirable to do the dedup check in the `commit` call, because it happens for each checkpoint cycle. We only need to do the de-dup check one

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-20 Thread Guowei Ma
I would like to summarize the file type sink in the thread and their possible topologies. I also try to give pros and cons of every topology option. Correct me if I am wrong. ### FileSink Topology Option: TmpFileWriter + Committer. ### IceBerg Sink Topology Option1: `DataFileWriter` +

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-20 Thread Guowei Ma
Hi, Stevn I want to make a clarification first, the following reply only considers the Iceberge sink, but does not consider other sinks. Before make decision we should consider all the sink.I would try to summary all the sink requirments in the next mail >> run global committer in jobmanager

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-19 Thread Steven Wu
> I prefer to let the developer produce id to dedupe. I think this gives the developer more opportunity to optimize. Thinking about it again, I totally agree with Guowei on this. We don't really need the framework to generate the unique id for Iceberg sink. De-dup logic is totally internal to

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-19 Thread Guowei Ma
Hi, all >>Just to add to what Aljoscha said regarding the unique id. Iceberg sink >>checkpoints the unique id into state during snapshot. It also inserts the >>unique id into the Iceberg snapshot metadata during commit. When a job >>restores the state after failure, it needs to know if the

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-18 Thread Steven Wu
Aljoscha, > Instead the sink would have to check for each set of committables seperately if they had already been committed. Do you think this is feasible? Yes, that is how it works in our internal implementation [1]. We don't use checkpointId. We generate a manifest file (GlobalCommT) to bundle

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-18 Thread Aljoscha Krettek
Steven, we were also wondering if it is a strict requirement that "later" updates to Iceberg subsume earlier updates. In the current version, you only check whether checkpoint X made it to Iceberg and then discard all committable state from Flink state for checkpoints smaller X. If we go

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-17 Thread Steven Wu
Guowei Just to add to what Aljoscha said regarding the unique id. Iceberg sink checkpoints the unique id into state during snapshot. It also inserts the unique id into the Iceberg snapshot metadata during commit. When a job restores the state after failure, it needs to know if the restored

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-17 Thread Aljoscha Krettek
Thanks for the summary! On 16.09.20 06:29, Guowei Ma wrote: ## Consensus 1. The motivation of the unified sink API is to decouple the sink implementation from the different runtime execution mode. 2. The initial scope of the unified sink API only covers the file system type, which supports the

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-17 Thread Piotr Nowojski
Hi Guowei, Thanks for the explanation. Now I get your point. Basically any action that would make sink unavailable, would also cause it to block on snapshotting the state (in option 1. with flushing). I agree that lack of availability is much less of an issue than I have thought before. For

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-16 Thread Guowei Ma
Hi, Steven I am not particularly sure whether to provide id in GlobalCommit. But my understanding is: if the committer function is idempotent, the framework can guarantee exactly once semantics in batch/stream execution mode. But I think maybe the idempotence should be guaranteed by the sink

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-16 Thread Guowei Ma
Thank @piotr very much for your patient explanation. I would try to explain what is in my mind. Considering following case: FlinkSink E6 -> Client Buffer Queue |E5|E4|E3|E2| --> External System E1 When the FlinkSink can not add the E6 to the Queue maybe the external client is

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-16 Thread Steven Wu
Guowei, thanks a lot for the summary. Here are a couple more questions that need more clarification for the GlobalCommitter case. * framework provides some sort of unique id per GlobalCommT (e.g. nonce or some sort of transaction id) * commit failure handling. Should we roll over to the next

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-16 Thread Piotr Nowojski
Hey Thanks Dawid for bringing up my suggestion :) > I'm not so sure about this, the sinks I'm aware of would not be able to > implement this method: Kafka doesn't have this, I didn't see it in the > Iceberg interfaces, and HDFS/S3 also don't have it. Aljoscha, as I wrote, FlinkKafkaProducer is

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Guowei Ma
Hi,all Thanks for all your valuable options and ideas.Currently there are many topics in the mail. I try to summarize what is consensus and what is not. Correct me if I am wrong. ## Consensus 1. The motivation of the unified sink API is to decouple the sink implementation from the different

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Guowei Ma
Hi, Steven Thanks you for your thoughtful ideas and concerns. >>I still like the concept of grouping data files per checkpoint for streaming mode. it is cleaner and probably easier to manage and deal with commit failures. Plus, it >>can reduce dupes for the at least once >>mode. I understand

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Steven Wu
> AFAIK the committer would not see the file-1-2 when ck1 happens in the ExactlyOnce mode. @Guowei Ma I think you are right for exactly once checkpoint semantics. what about "at least once"? I guess we can argue that it is fine to commit file-1-2 for at least once mode. I still like the concept

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Steven Wu
> images don't make it through to the mailing lists. You would need to host the file somewhere and send a link. Sorry about that. Here is the sample DAG in google drawings. https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing On Tue, Sep 15, 2020 at

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Guowei Ma
Hi, Dawid >>I still find the merging case the most confusing. I don't necessarily understand why do you need the "SingleFileCommit" step in this scenario. The way I >> understand "commit" operation is that it makes some data/artifacts visible to the external system, thus it should be immutable

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Aljoscha Krettek
On 15.09.20 09:55, Dawid Wysakowicz wrote: BTW Let's not forget about Piotr's comment. I think we could add the isAvailable or similar method to the Writer interface in the FLIP. I'm not so sure about this, the sinks I'm aware of would not be able to implement this method: Kafka doesn't have

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Dawid Wysakowicz
> What I understand is that HiveSink's implementation might need the local > committer(FileCommitter) because the file rename is needed. > But the iceberg only needs to write the manifest file. Would you like to > enlighten me why the Iceberg needs the local committer? > Thanks Sorry if I caused

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Aljoscha Krettek
On 15.09.20 06:05, Guowei Ma wrote: ## Using checkpointId In the batch execution mode there would be no normal checkpoint any more. That is why we do not introduce the checkpoint id in the API. So it is a great thing that sink decouples its implementation from checkpointid. :) Yes, this is a

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Aljoscha Krettek
On 15.09.20 01:33, Steven Wu wrote: ## concurrent checkpoints @Aljoscha Krettek regarding the concurrent checkpoints, let me illustrate with a simple DAG below. [image: image.png] Hi Steven, images don't make it through to the mailing lists. You would need to host the file somewhere and

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Guowei Ma
>> I would think that we only need flush() and the semantics are that it >> prepares for a commit, so on a physical level it would be called from >> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >> think flush() should be renamed to something like "prepareCommit()". >

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Guowei Ma
Hi, aljoscha >I don't understand why we need the "Drain and Snapshot" section. It >seems to be some details about stop-with-savepoint and drain, and the >relation to BATCH execution but I don't know if it is needed to >understand the rest of the document. I'm happy to be wrong here, though, >if

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Guowei Ma
## Concurrent checkpoints AFAIK the committer would not see the file-1-2 when ck1 happens in the ExactlyOnce mode. ## Committable bookkeeping and combining I agree with you that the "CombineGlobalCommitter" would work. But we put more optimization logic in the committer, which will make the

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Steven Wu
## concurrent checkpoints @Aljoscha Krettek regarding the concurrent checkpoints, let me illustrate with a simple DAG below. [image: image.png] Let's assume each writer emits one file per checkpoint cycle and *writer-2 is slow*. Now let's look at what the global committer receives timeline:

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Guowei Ma
Hi all, Very thanks for the discussion and the valuable opinions! Currently there are several ongoing issues and we would like to show what we are thinking in the next few mails. It seems that the biggest issue now is about the topology of the sinks. Before deciding what the sink API would look

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Dawid Wysakowicz
Hi all, > I would think that we only need flush() and the semantics are that it > prepares for a commit, so on a physical level it would be called from > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > think flush() should be renamed to something like "prepareCommit()".

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Aljoscha Krettek
I thought about this some more. One of the important parts of the Iceberg sink is to know whether we have already committed some DataFiles. Currently, this is implemented by writing a (JobId, MaxCheckpointId) tuple to the Iceberg table when committing. When restoring from a failure we check

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Aljoscha Krettek
On 14.09.20 01:23, Steven Wu wrote: ## Writer interface For the Writer interface, should we add "*prepareSnapshot"* before the checkpoint barrier emitted downstream? IcebergWriter would need it. Or would the framework call "*flush*" before the barrier emitted downstream? that guarantee would

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Piotr Nowojski
Hi, I've just briefly skimmed over the proposed interfaces. I would suggest one addition to the Writer interface (as I understand this is the runtime interface in this proposal?): add some availability method, to avoid, if possible, blocking calls on the sink. We already have similar availability

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-13 Thread Steven Wu
Aljoscha, thanks a lot for the detailed response. Now I have a better understanding of the initial scope. To me, there are possibly three different committer behaviors. For the lack of better names, let's call them * No/NoopCommitter * Committer / LocalCommitter (file sink?) * GlobalCommitter

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-11 Thread Aljoscha Krettek
Regarding the FLIP itself, I like the motivation section and the What/How/When/Where section a lot! I don't understand why we need the "Drain and Snapshot" section. It seems to be some details about stop-with-savepoint and drain, and the relation to BATCH execution but I don't know if it is

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-11 Thread Aljoscha Krettek
Hi Everyone, thanks to Guowei for publishing the FLIP, and thanks Steven for the very thoughtful email! We thought a lot internally about some of the questions you posted but left a lot (almost all) of the implementation details out of the FLIP for now because we wanted to focus on

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-10 Thread Steven Wu
Guowei, Thanks a lot for the proposal and starting the discussion thread. Very excited. For the big question of "Is the sink an operator or a topology?", I have a few related sub questions. * Where should we run the committers? * Is the committer parallel or single parallelism? * Can a single

[DISCUSS] FLIP-143: Unified Sink API

2020-09-10 Thread Guowei Ma
Hi, devs & users As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor of DataStream API and Table API. Users should be able to use DataStream API to write jobs that support both bounded and unbounded execution modes. However, Flink does not provide a sink API to guarantee