Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior
and btw it is interesting to notice that AWS seems to do the approach that I suggested first. All functions are SQL standard compliant, and only dedicated functions with a prefix such as CURRENT_ROW_TIMESTAMP divert from the standard. Regards, Timo On 01.03.21 08:45, Timo Walther wrote: How about we simply go for your first approach by having [query-start, row, auto] as configuration parameters where [auto] is the default? This sounds like a good consensus where everyone is happy, no? This also allows user to restore the old per-row behavior for all functions that we had before Flink 1.13. Regards, Timo On 26.02.21 11:10, Leonard Xu wrote: Thanks Joe for the great investigation. • Generally urging for semantics (batch > time of first query issued, streaming > row level). I discussed the thing now with Timo & Stephan: • It seems to go towards a config parameter, either [query-start, row] or [query-start, row, auto] and what is the default? • The main question seems to be: are we pushing the default towards streaming. (probably related the insert into behaviour in the sql client). It looks like opinions in this thread and user inputs agreed that: batch should use time of first query, streaming should use row level. Based on these, we should keep row level for streaming and query start for batch just like the config parameter value [auto]. Currently Flink keeps row level for time function in both batch and streaming job, thus we only need to update the behavior in batch. I tend to not expose an obscure configuration to users especially it is semantics-related. 1.We can make [auto] as a default agreement,for current Flink streaming users,they feel nothing has changed,for current Flink batch users,they feel Flink batch is corrected to other good batch engines as well as SQL standard. We can also provide a function CURRENT_ROW_TIMESTAMP[1] for Flink batch users who want row level time function. 2. CURRENT_ROW_TIMESTAMP can also be used in Flink streaming, it has clear semantics, we can encourage users to use it. In this way, We don’t have to introduce an obscure configuration prematurely while making all users happy How do you think? Best, Leonard [1] https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-current-row-timestamp.html Hope this helps, Thanks, Joe On 19.02.2021, at 10:25, Leonard Xu wrote: Hi, Joe Thanks for volunteering to investigate the user data on this topic. Do you have any progress here? Thanks, Leonard On Thu, Feb 4, 2021 at 3:08 PM Johannes Moser wrote: Hello, I will work with some users to get data on that. Thanks, Joe On 03.02.2021, at 14:58, Stephan Ewen wrote: Hi all! A quick thought on this thread: We see a typical stalemate here, as in so many discussions recently. One developer prefers it this way, another one another way. Both have pro/con arguments, it takes a lot of time from everyone, still there is little progress in the discussion. Ultimately, this can only be decided by talking to the users. And it would also be the best way to ensure that what we build is the intuitive and expected way for users. The less the users are into the deep aspects of Flink SQL, the better they can mirror what a common user would expect (a power user will anyways figure it out). Let's find a person to drive that, spell it out in the FLIP as "semantics TBD", and focus on the implementation of the parts that are agreed upon. For interviewing the users, here are some ideas for questions to look at: - How do they view the trade-off between stable semantics vs. out-of-the-box magic (faster getting started). - How comfortable are they realizing the different meaning of "now()" in a streaming versus batch context. - What would be their expectation when moving a query with the time functions ("now()") from an unbounded stream (Kafka source without end offset) to a bounded stream (Kafka source with end offsets), which may switch execution to batch. Best, Stephan On Tue, Feb 2, 2021 at 3:19 PM Jark Wu wrote: Hi Fabian, I think we have an agreement that the functions should be evaluated at query start in batch mode. Because all the other batch systems and traditional databases are this behavior, which is standard SQL compliant. *1. The different point of view is what's the behavior in streaming mode? * From my point of view, I don't see any potential meaning to evaluate at query-start for a 365-day long running streaming job. And from my observation, CURRENT_TIMESTAMP is heavily used by Flink streaming users and they expect the current behaviors. The SQL standard only provides a guideline for traditional batch systems, however Flink is a leading streaming processing system which is out of the scope of SQL standard, and Flink should define the streaming standard. I think a standard should follow users' intuition. Therefore, I think we don't need to be standard
Re: [DISCUSS] Apache Flink Jira Process
Thanks for driving this discussion, Konstantin. I like the idea of having a bot reminding reporter/assignee/watchers about inactive tickets and if needed downgrade/close them automatically. My two cents: We may have labels like "downgraded-by-bot" / "closed-by-bot", so that it's easier to filter and review tickets updated by the bot. We may want to review such tickets (e.g., monthly) in case a valid ticket failed to draw the attention of relevant committers and the reporter doesn't know who to ping. Thank you~ Xintong Song On Sat, Feb 27, 2021 at 1:37 AM Till Rohrmann wrote: > Thanks for starting this discussion Konstantin. I like your proposal and > also the idea of automating the tedious parts of it via a bot. > > Cheers, > Till > > On Fri, Feb 26, 2021 at 4:17 PM Konstantin Knauf > wrote: > > > Dear Flink Community, > > > > I would like to start a discussion on improving and to some extent simply > > defining the way we work with Jira. Some aspects have been discussed a > > while back [1], but I would like to go a bit beyond that with the > following > > goals in mind: > > > > > >- > > > >clearer communication and expectation management with the community > >- > > > > a user or contributor should be able to judge the urgency of a > ticket > > by its priority > > - > > > > if a ticket is assigned to someone the expectation that someone is > > working on it should hold > > - > > > >generally reduce noise in Jira > >- > > > >reduce overhead of committers to ask about status updates of > >contributions or bug reports > >- > > > > “Are you still working on this?” > > - > > > > “Are you still interested in this?” > > - > > > > “Does this still happen on Flink 1.x?” > > - > > > > “Are you still experiencing this issue?” > > - > > > > “What is the status of the implementation”? > > - > > > >while still encouraging users to add new tickets and to leave feedback > >about existing tickets > > > > > > Please see the full proposal here: > > > > > https://docs.google.com/document/d/19VmykDSn4BHgsCNTXtN89R7xea8e3cUIl-uivW8L6W8/edit# > > . > > > > The idea would be to discuss this proposal in this thread. If we come to > a > > conclusion, I'd document the proposal in the wiki [2] and we would then > > vote on it (approval by "Lazy Majority"). > > > > Cheers, > > > > Konstantin > > > > [1] > > > > > https://lists.apache.org/thread.html/rd34fb695d371c2bf0cbd1696ce190bac35dd78f29edd8c60d0c7ee71%40%3Cdev.flink.apache.org%3E > > [2] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLINK+Jira+field+definitions > > > > -- > > > > Konstantin Knauf > > > > https://twitter.com/snntrable > > > > https://github.com/knaufk > > >
Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior
How about we simply go for your first approach by having [query-start, row, auto] as configuration parameters where [auto] is the default? This sounds like a good consensus where everyone is happy, no? This also allows user to restore the old per-row behavior for all functions that we had before Flink 1.13. Regards, Timo On 26.02.21 11:10, Leonard Xu wrote: Thanks Joe for the great investigation. • Generally urging for semantics (batch > time of first query issued, streaming > row level). I discussed the thing now with Timo & Stephan: • It seems to go towards a config parameter, either [query-start, row] or [query-start, row, auto] and what is the default? • The main question seems to be: are we pushing the default towards streaming. (probably related the insert into behaviour in the sql client). It looks like opinions in this thread and user inputs agreed that: batch should use time of first query, streaming should use row level. Based on these, we should keep row level for streaming and query start for batch just like the config parameter value [auto]. Currently Flink keeps row level for time function in both batch and streaming job, thus we only need to update the behavior in batch. I tend to not expose an obscure configuration to users especially it is semantics-related. 1.We can make [auto] as a default agreement,for current Flink streaming users,they feel nothing has changed,for current Flink batch users,they feel Flink batch is corrected to other good batch engines as well as SQL standard. We can also provide a function CURRENT_ROW_TIMESTAMP[1] for Flink batch users who want row level time function. 2. CURRENT_ROW_TIMESTAMP can also be used in Flink streaming, it has clear semantics, we can encourage users to use it. In this way, We don’t have to introduce an obscure configuration prematurely while making all users happy How do you think? Best, Leonard [1] https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-current-row-timestamp.html Hope this helps, Thanks, Joe On 19.02.2021, at 10:25, Leonard Xu wrote: Hi, Joe Thanks for volunteering to investigate the user data on this topic. Do you have any progress here? Thanks, Leonard On Thu, Feb 4, 2021 at 3:08 PM Johannes Moser wrote: Hello, I will work with some users to get data on that. Thanks, Joe On 03.02.2021, at 14:58, Stephan Ewen wrote: Hi all! A quick thought on this thread: We see a typical stalemate here, as in so many discussions recently. One developer prefers it this way, another one another way. Both have pro/con arguments, it takes a lot of time from everyone, still there is little progress in the discussion. Ultimately, this can only be decided by talking to the users. And it would also be the best way to ensure that what we build is the intuitive and expected way for users. The less the users are into the deep aspects of Flink SQL, the better they can mirror what a common user would expect (a power user will anyways figure it out). Let's find a person to drive that, spell it out in the FLIP as "semantics TBD", and focus on the implementation of the parts that are agreed upon. For interviewing the users, here are some ideas for questions to look at: - How do they view the trade-off between stable semantics vs. out-of-the-box magic (faster getting started). - How comfortable are they realizing the different meaning of "now()" in a streaming versus batch context. - What would be their expectation when moving a query with the time functions ("now()") from an unbounded stream (Kafka source without end offset) to a bounded stream (Kafka source with end offsets), which may switch execution to batch. Best, Stephan On Tue, Feb 2, 2021 at 3:19 PM Jark Wu wrote: Hi Fabian, I think we have an agreement that the functions should be evaluated at query start in batch mode. Because all the other batch systems and traditional databases are this behavior, which is standard SQL compliant. *1. The different point of view is what's the behavior in streaming mode? * From my point of view, I don't see any potential meaning to evaluate at query-start for a 365-day long running streaming job. And from my observation, CURRENT_TIMESTAMP is heavily used by Flink streaming users and they expect the current behaviors. The SQL standard only provides a guideline for traditional batch systems, however Flink is a leading streaming processing system which is out of the scope of SQL standard, and Flink should define the streaming standard. I think a standard should follow users' intuition. Therefore, I think we don't need to be standard SQL compliant at this point because users don't expect it. Changing the behavior of the functions to evaluate at query start for streaming mode will hurt most of Flink SQL users and we have nothing to gain, we should avoid this. *2. Does it break the unified streaming-batch semantics? * I don't think so. First of
Re: [DISCUSS]FLIP-163: SQL Client Improvements
We could also think about reading this config option in Table API. The effect would be to call `await()` directly in an execute call. I could also imagine this to be useful esp. when you fire a lot of insert into queries. We had the case before that users where confused that the execution happens asynchronously, such an option could prevent this to happen again. Regards, Timo On 01.03.21 05:14, Kurt Young wrote: I also asked some users about their opinion that if we introduce some config prefixed with "table" but doesn't have affection with methods in Table API and SQL. All of them are kind of shocked by such question, asking why we would do anything like this. This kind of reaction actually doesn't surprise me a lot, so I jumped in and challenged this config option even after the FLIP had already been accepted. If we only have to define the execution behavior for multiple statements in SQL client, we should only introduce a config option which would tell users it's affection scope by its name. Prefixing with "table" is definitely not a good idea here. Best, Kurt On Fri, Feb 26, 2021 at 9:39 PM Leonard Xu wrote: Hi, all Look like there’s only one divergence about option [ table | sql-client ].dml-sync in this thread, correct me if I’m wrong. 1. Leaving the context of this thread, from a user's perspective, the table.xx configurations should take effect in Table API & SQL, the sql-client.xx configurations should only take effect in sql-client. In my(the user's) opinion, other explanations are counterintuitive. 2. It should be pointed out that both all existed table.xx configurations like table.exec.state.ttl, table.optimizer.agg-phase-strategy, table.local-time-zone,etc.. and the proposed sql-client.xx configurations like sql-client.verbose, sql-client.execution.max-table-result.rows comply with this convention. 3. Considering the portability to support different CLI tools (sql-client, sql-gateway, etc.), I prefer table.dml-sync. In addition, I think sql-client/sql-gateway/other CLI tools can be placed out of flink-table module even in an external project, this should not affect our conclusion. Hope this can help you. Best, Leonard 在 2021年2月25日,18:51,Shengkai Fang 写道: Hi, everyone. I do some summaries about the discussion about the option. If the summary has errors, please correct me. `table.dml-sync`: - take effect for `executeMultiSql` and sql client - benefit: SQL script portability. One script for all platforms. - drawback: Don't work for `TableEnvironment#executeSql`. `table.multi-dml-sync`: - take effect for `executeMultiSql` and sql client - benefit: SQL script portability - drawback: It's confused when the sql script has one dml statement but need to set option `table.multi-dml-sync` `client.dml-sync`: - take effect for sql client only - benefit: clear definition. - drawback: Every platform needs to define its own option. Bad SQL script portability. Just as Jark said, I think the `table.dml-sync` is a good choice if we can extend its scope and make this option works for `executeSql`. It's straightforward and users can use this option now in table api. The drawback is the `TableResult#await` plays the same role as the option. I don't think the drawback is really critical because many systems have commands play the same role with the different names. Best, Shengkai Timo Walther 于2021年2月25日周四 下午4:23写道: The `table.` prefix is meant to be a general option in the table ecosystem. Not necessarily attached to Table API or SQL Client. That's why SQL Client is also located in the `flink-table` module. My main concern is the SQL script portability. Declaring the sync/async behavior will happen in many SQL scripts. And users should be easily switch from SQL Client to some commercial product without the need of changing the script again. Sure, we can change from `sql-client.dml-sync` to `table.dml-sync` later but that would mean introducing future confusion. An app name (what `sql-client` kind of is) should not be part of a config option key if other apps will need the same kind of option. Regards, Timo On 24.02.21 08:59, Jark Wu wrote: From my point of view, I also prefer "sql-client.dml-sync", because the behavior of this configuration is very clear. Even if we introduce a new config in the future, e.g. `table.dml-sync`, we can also deprecate the sql-client one. Introducing a "table." configuration without any implementation will confuse users a lot, as they expect it should take effect on the Table API. If we want to introduce an unified "table.dml-sync" option, I prefer it should be implemented on Table API and affect all the DMLs on Table API (`tEnv.executeSql`, `Table.executeInsert`, `StatementSet`), as I have mentioned before [1]. It would be very straightforward that it affects all the DMLs on SQL CLI and TableEnvironment (including `executeSql`, `StatementSet`, `Table#executeInsert`, etc.). This can also make SQL CLI easy to support
[jira] [Created] (FLINK-21532) Make CatalogTableImpl#toProperties and CatalogTableImpl#fromProperties case sensitive
Lsw_aka_laplace created FLINK-21532: --- Summary: Make CatalogTableImpl#toProperties and CatalogTableImpl#fromProperties case sensitive Key: FLINK-21532 URL: https://issues.apache.org/jira/browse/FLINK-21532 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Lsw_aka_laplace -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
(Sorry that I repeat this mail since the last one is not added into the same mail list thread, very sorry for the inconvenience) Hi all, Very thanks for all the deep thoughts! > How to implement the stop-with-savepoint --drain/terminate command with > this model: One idea could be to tell the sources that they should stop > reading. This should trigger the EndOfPartitionEvent to be sent > downstream. > This will transition all operators into the TERMINATING state. > > Currently, EndOfPartitionEvent is only issued after StreamTask.invoke > returned. To achieve above, possible works should be required: > * Promote `EndOfPartitionEvent` from `Task` to `StreamTask`. This may > have some interferences with BatchTask or network io stack. > * Or introducing stream task level `EndOfUserRecordsEvent`(from PR#14831 > @Yun @Piotr) > * Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`. I also have similar concern with Kezhu for the issue whether do we need to introduce a new message to notify the operators to endOfInput/close ? The main concerns of reusing that EndOfPartitionEvent is that 1. The EndOfPartitionEvent is currently emitted in Task instead of StreamTask, we would need some refactors here. 2. Currently the InputGate/InputChannel would be released after the downstream tasks have received EndOfPartitionEvent from all the input channels, this would makes the following checkpoint unable to perform since we could not emit barriers to downstream tasks ? Regarding the MAX_WATERMARK, I still not fully understand the issue since it seems to me that now Flink won't snapshot the watermark now? If the job failover, the window operator would reload all the pending windows before flushed by MAX_WATERMARK and when the job finish again, it would re-emit the MAX_WATERMARK? Best, Yun -- From:Kezhu Wang Send Time:2021 Mar. 1 (Mon.) 01:26 To:Till Rohrmann Cc:Piotr Nowojski ; Guowei Ma ; dev ; Yun Gao ; jingsongl...@gmail.com Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished In “stop-with-savepoint —drain”, MAX_WATERMARK is not an issue. For normal finishing task, not allowing unaligned checkpoint does not solve the problem as MAX_WATERMARK could be persisted in downstream task. When scenario @Piotr depicted occurs, downstream(or further downstream) window operator will count all inputs as late. > If we ensure that the MAX_WATERMARK is only persisted in state if a recovery will trigger immediately the shut down of this operator, then it shouldn't be an issue. You are right in case the assumption holds, I have same thought as you before. But I am kind of worry about whether it is too prefect to be fragile. This requires strong guarantee from implementation that a recovery from TERMINATING stage should go directly to that stage. > I think the MAX_WATERMARK event should be sent either just before or with the EndOfPartitionEvent. I prefer “with the EndOfPartitionEvent”. EndOfPartitionEvent itself already carry what ending MAX_WATERMARK try to express. May be we can reuse it ? @Piotr A preview work[1] from @Yun in PR#14831 explains EndOfPartitionEvent as checkpoint barrier if there are pending checkpoints. [1]: https://github.com/gaoyunhaii/flink/commit/1aaa80fb0afad13a314b06783c61c01b9414f91b#diff-d4568b34060bc589cd1354bd125c35aca993dd0c9fe9e4460dfed73501116459R177 Best, Kezhu Wang On February 28, 2021 at 21:23:31, Till Rohrmann (trohrm...@apache.org) wrote: I think you are right with the problem of endOfInput. endOfInput should not be used to commit final results. In fact if this termination fails then we might end up in a different outcome of the job which is equally valid as the one before the failure. Concerning unaligned checkpoints, I think they don't play well together with draining a streaming pipeline. The problem is that in the draining case you want to process all records which are still in flight but unaligned checkpoints don't guarantee this as they can jump in flight records. I think the MAX_WATERMARK event should be sent either just before or with the EndOfPartitionEvent. If we ensure that the MAX_WATERMARK is only persisted in state if a recovery will trigger immediately the shut down of this operator, then it shouldn't be an issue. Cheers, Till On Sun, Feb 28, 2021 at 9:06 AM Kezhu Wang wrote: > Hi Till, > > Just for bookkeeping, some observations from current implementation. > > > With this model, the final checkpoint is quite simple because it is > ingrained in the lifecycle of an operator. Differently said an operator > will only terminate after it has committed its side effects and seen the > notifyCheckpointComplete message (if it is stateful). > > Currently, we could not mark this operator(or subtask) as terminated since > result of `notifyCheckpointComplete`(possible side effect committing) is > not taken into account of the belonging checkpoint. The job
Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi all Very thanks for all the deep thoughts! > How to implement the stop-with-savepoint --drain/terminate command with > this model: One idea could be to tell the sources that they should stop > reading. This should trigger the EndOfPartitionEvent to be sent > downstream. > This will transition all operators into the TERMINATING state. > > Currently, EndOfPartitionEvent is only issued after StreamTask.invoke > returned. To achieve above, possible works should be required: > * Promote `EndOfPartitionEvent` from `Task` to `StreamTask`. This may > have some interferences with BatchTask or network io stack. > * Or introducing stream task level `EndOfUserRecordsEvent`(from PR#14831 > @Yun @Piotr) > * Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`. I also have similar concern with Kezhu for the issue whether do we need to introduce a new message to notify the operators to endOfInput/close ? The main concerns of reusing that EndOfPartitionEvent is that 1. The EndOfPartitionEvent is currently emitted in Task instead of StreamTask, we would need some refactors here. 2. Currently the InputGate/InputChannel would be released after the downstream tasks have received EndOfPartitionEvent from all the input channels, this would makes the following checkpoint unable to perform since we could not emit barriers to downstream tasks ? Regarding the MAX_WATERMARK, I still not fully understand the issue since it seems to me that now Flink won't snapshot the watermark now? If the job failover, the window operator would reload all the pending windows before flushed by MAX_WATERMARK and when the job finish again, it would re-emit the MAX_WATERMARK? Best, Yun -- From:Kezhu Wang Send Time:2021 Mar. 1 (Mon.) 01:26 To:Till Rohrmann Cc:Piotr Nowojski ; Guowei Ma ; dev ; Yun Gao ; jingsongl...@gmail.com Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished In “stop-with-savepoint —drain”, MAX_WATERMARK is not an issue. For normal finishing task, not allowing unaligned checkpoint does not solve the problem as MAX_WATERMARK could be persisted in downstream task. When scenario @Piotr depicted occurs, downstream(or further downstream) window operator will count all inputs as late. > If we ensure that the MAX_WATERMARK is only persisted in state if a recovery will trigger immediately the shut down of this operator, then it shouldn't be an issue. You are right in case the assumption holds, I have same thought as you before. But I am kind of worry about whether it is too prefect to be fragile. This requires strong guarantee from implementation that a recovery from TERMINATING stage should go directly to that stage. > I think the MAX_WATERMARK event should be sent either just before or with the EndOfPartitionEvent. I prefer “with the EndOfPartitionEvent”. EndOfPartitionEvent itself already carry what ending MAX_WATERMARK try to express. May be we can reuse it ? @Piotr A preview work[1] from @Yun in PR#14831 explains EndOfPartitionEvent as checkpoint barrier if there are pending checkpoints. [1]: https://github.com/gaoyunhaii/flink/commit/1aaa80fb0afad13a314b06783c61c01b9414f91b#diff-d4568b34060bc589cd1354bd125c35aca993dd0c9fe9e4460dfed73501116459R177 Best, Kezhu Wang On February 28, 2021 at 21:23:31, Till Rohrmann (trohrm...@apache.org) wrote: I think you are right with the problem of endOfInput. endOfInput should not be used to commit final results. In fact if this termination fails then we might end up in a different outcome of the job which is equally valid as the one before the failure. Concerning unaligned checkpoints, I think they don't play well together with draining a streaming pipeline. The problem is that in the draining case you want to process all records which are still in flight but unaligned checkpoints don't guarantee this as they can jump in flight records. I think the MAX_WATERMARK event should be sent either just before or with the EndOfPartitionEvent. If we ensure that the MAX_WATERMARK is only persisted in state if a recovery will trigger immediately the shut down of this operator, then it shouldn't be an issue. Cheers, Till On Sun, Feb 28, 2021 at 9:06 AM Kezhu Wang wrote: > Hi Till, > > Just for bookkeeping, some observations from current implementation. > > > With this model, the final checkpoint is quite simple because it is > ingrained in the lifecycle of an operator. Differently said an operator > will only terminate after it has committed its side effects and seen the > notifyCheckpointComplete message (if it is stateful). > > Currently, we could not mark this operator(or subtask) as terminated since > result of `notifyCheckpointComplete`(possible side effect committing) is > not taken into account of the belonging checkpoint. The job has to run to > next safe point(finished or next checkpoint success) to be marked as > “terminated”. > > > How to implement the
[jira] [Created] (FLINK-21531) Introduce pluggable Parser
Rui Li created FLINK-21531: -- Summary: Introduce pluggable Parser Key: FLINK-21531 URL: https://issues.apache.org/jira/browse/FLINK-21531 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Rui Li Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21530) Precompute TypeName's canonical string representation
Tzu-Li (Gordon) Tai created FLINK-21530: --- Summary: Precompute TypeName's canonical string representation Key: FLINK-21530 URL: https://issues.apache.org/jira/browse/FLINK-21530 Project: Flink Issue Type: Improvement Components: Stateful Functions Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: statefun-3.0.0 There's room for improvement in StateFun's {{PersistedRemoteFunctionValues}}, where we currently concatenate strings to build the typename string for each state value we attach to a {{ToFunction}} message. This extra work can be easily avoided by precomputing the canonical typename string, since {{TypeName}}'s are immutable. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21529) FLIP-152: Hive Query Syntax Compatibility
Rui Li created FLINK-21529: -- Summary: FLIP-152: Hive Query Syntax Compatibility Key: FLINK-21529 URL: https://issues.apache.org/jira/browse/FLINK-21529 Project: Flink Issue Type: New Feature Reporter: Rui Li -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS]FLIP-163: SQL Client Improvements
I also asked some users about their opinion that if we introduce some config prefixed with "table" but doesn't have affection with methods in Table API and SQL. All of them are kind of shocked by such question, asking why we would do anything like this. This kind of reaction actually doesn't surprise me a lot, so I jumped in and challenged this config option even after the FLIP had already been accepted. If we only have to define the execution behavior for multiple statements in SQL client, we should only introduce a config option which would tell users it's affection scope by its name. Prefixing with "table" is definitely not a good idea here. Best, Kurt On Fri, Feb 26, 2021 at 9:39 PM Leonard Xu wrote: > Hi, all > > Look like there’s only one divergence about option [ table | sql-client > ].dml-sync in this thread, correct me if I’m wrong. > > 1. Leaving the context of this thread, from a user's perspective, > the table.xx configurations should take effect in Table API & SQL, > the sql-client.xx configurations should only take effect in sql-client. > In my(the user's) opinion, other explanations are counterintuitive. > > 2. It should be pointed out that both all existed table.xx configurations > like table.exec.state.ttl, table.optimizer.agg-phase-strategy, > table.local-time-zone,etc.. and the proposed sql-client.xx configurations > like sql-client.verbose, sql-client.execution.max-table-result.rows > comply with this convention. > > 3. Considering the portability to support different CLI tools (sql-client, > sql-gateway, etc.), I prefer table.dml-sync. > > In addition, I think sql-client/sql-gateway/other CLI tools can be placed > out of flink-table module even in an external project, this should not > affect our conclusion. > > > Hope this can help you. > > > Best, > Leonard > > > > > 在 2021年2月25日,18:51,Shengkai Fang 写道: > > > > Hi, everyone. > > > > I do some summaries about the discussion about the option. If the summary > > has errors, please correct me. > > > > `table.dml-sync`: > > - take effect for `executeMultiSql` and sql client > > - benefit: SQL script portability. One script for all platforms. > > - drawback: Don't work for `TableEnvironment#executeSql`. > > > > `table.multi-dml-sync`: > > - take effect for `executeMultiSql` and sql client > > - benefit: SQL script portability > > - drawback: It's confused when the sql script has one dml statement but > > need to set option `table.multi-dml-sync` > > > > `client.dml-sync`: > > - take effect for sql client only > > - benefit: clear definition. > > - drawback: Every platform needs to define its own option. Bad SQL script > > portability. > > > > Just as Jark said, I think the `table.dml-sync` is a good choice if we > can > > extend its scope and make this option works for `executeSql`. > > It's straightforward and users can use this option now in table api. The > > drawback is the `TableResult#await` plays the same role as the option. > I > > don't think the drawback is really critical because many systems have > > commands play the same role with the different names. > > > > Best, > > Shengkai > > > > Timo Walther 于2021年2月25日周四 下午4:23写道: > > > >> The `table.` prefix is meant to be a general option in the table > >> ecosystem. Not necessarily attached to Table API or SQL Client. That's > >> why SQL Client is also located in the `flink-table` module. > >> > >> My main concern is the SQL script portability. Declaring the sync/async > >> behavior will happen in many SQL scripts. And users should be easily > >> switch from SQL Client to some commercial product without the need of > >> changing the script again. > >> > >> Sure, we can change from `sql-client.dml-sync` to `table.dml-sync` later > >> but that would mean introducing future confusion. An app name (what > >> `sql-client` kind of is) should not be part of a config option key if > >> other apps will need the same kind of option. > >> > >> Regards, > >> Timo > >> > >> > >> On 24.02.21 08:59, Jark Wu wrote: > From my point of view, I also prefer "sql-client.dml-sync", > >>> because the behavior of this configuration is very clear. > >>> Even if we introduce a new config in the future, e.g. `table.dml-sync`, > >>> we can also deprecate the sql-client one. > >>> > >>> Introducing a "table." configuration without any implementation > >>> will confuse users a lot, as they expect it should take effect on > >>> the Table API. > >>> > >>> If we want to introduce an unified "table.dml-sync" option, I prefer > >>> it should be implemented on Table API and affect all the DMLs on > >>> Table API (`tEnv.executeSql`, `Table.executeInsert`, `StatementSet`), > >>> as I have mentioned before [1]. > >>> > It would be very straightforward that it affects all the DMLs on SQL > CLI > >>> and > >>> TableEnvironment (including `executeSql`, `StatementSet`, > >>> `Table#executeInsert`, etc.). > >>> This can also make SQL CLI easy to support this configuration by > passing > >>> through
[jira] [Created] (FLINK-21528) Rest Api Support Wildcard
wxmimperio created FLINK-21528: -- Summary: Rest Api Support Wildcard Key: FLINK-21528 URL: https://issues.apache.org/jira/browse/FLINK-21528 Project: Flink Issue Type: Improvement Components: Runtime / REST Affects Versions: 1.11.1 Reporter: wxmimperio Attachments: image-2021-03-01-12-00-36-269.png, image-2021-03-01-12-05-05-881.png, image-2021-03-01-12-06-15-340.png !image-2021-03-01-12-00-36-269.png! To obtain detailed metrics information, you need to specify the complete metrics name. But these names are automatically generated, only by obtaining a list of all metrics names, and then filtering by keywords. For example, the content of the red box is automatically generated without knowing in advance. !image-2021-03-01-12-06-15-340.png! I can only get all the metrics names, and then filter TableSourceScan and join-time-max. If I can http://xxx/metrics/get=*join-time-max, I can easily filter what i want. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21527) fromValues function cost enormous network buffer
Spongebob created FLINK-21527: - Summary: fromValues function cost enormous network buffer Key: FLINK-21527 URL: https://issues.apache.org/jira/browse/FLINK-21527 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Spongebob Attachments: image-2021-03-01-11-37-25-416.png When using fromValues to build a Table object, flink will cost enormous network buffer, even less than 100k raw data will cost exceed 2048 network memory buffer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21526) Replace scheduler benchmarks with wrapper classes
Zhilong Hong created FLINK-21526: Summary: Replace scheduler benchmarks with wrapper classes Key: FLINK-21526 URL: https://issues.apache.org/jira/browse/FLINK-21526 Project: Flink Issue Type: Sub-task Components: Benchmarks, Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 After moving the implementations of scheduler benchmarks to the Flink repository, we can replace scheduler benchmarks in flink-benchmark with wrapper classes. This will make sure the compilation and execution of flink-benchmark will not fail because of modifications in interfaces/methods used by scheduler benchmarks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21525) Move scheduler benchmarks to Flink and add unit tests
Zhilong Hong created FLINK-21525: Summary: Move scheduler benchmarks to Flink and add unit tests Key: FLINK-21525 URL: https://issues.apache.org/jira/browse/FLINK-21525 Project: Flink Issue Type: Sub-task Components: Benchmarks, Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 To improve the stability of scheduler benchmarks, we'll move the implementation of scheduler benchmarks to the Flink repository and add several unit tests. When someone modifies the interfaces/methods used by scheduler benchmarks, the unit tests will fail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21524) Replace scheduler benchmarks with wrapper classes
Zhilong Hong created FLINK-21524: Summary: Replace scheduler benchmarks with wrapper classes Key: FLINK-21524 URL: https://issues.apache.org/jira/browse/FLINK-21524 Project: Flink Issue Type: Improvement Components: Benchmarks, Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 Due to FLINK-21514, we find that when someone modifies interfaces and methods used by scheduler benchmarks, the compilation and execution of flink-benchmark may break. To improve the stability of scheduler benchmarks, we decide to replace scheduler benchmarks with wrapper/executor classes, and move the current implementations to the flink repository. Also we'll implement several unit tests based on the implementations. In this way, when someone modifies interfaces and methods used by scheduler benchmarks, the unit tests fails, and then they must fix the broken benchmarks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
In “stop-with-savepoint —drain”, MAX_WATERMARK is not an issue. For normal finishing task, not allowing unaligned checkpoint does not solve the problem as MAX_WATERMARK could be persisted in downstream task. When scenario @Piotr depicted occurs, downstream(or further downstream) window operator will count all inputs as late. > If we ensure that the MAX_WATERMARK is only persisted in state if a recovery will trigger immediately the shut down of this operator, then it shouldn't be an issue. You are right in case the assumption holds, I have same thought as you before. But I am kind of worry about whether it is too prefect to be fragile. This requires strong guarantee from implementation that a recovery from TERMINATING stage should go directly to that stage. > I think the MAX_WATERMARK event should be sent either just before or with the EndOfPartitionEvent. I prefer “with the EndOfPartitionEvent”. EndOfPartitionEvent itself already carry what ending MAX_WATERMARK try to express. May be we can reuse it ? @Piotr A preview work[1] from @Yun in PR#14831 explains EndOfPartitionEvent as checkpoint barrier if there are pending checkpoints. [1]: https://github.com/gaoyunhaii/flink/commit/1aaa80fb0afad13a314b06783c61c01b9414f91b#diff-d4568b34060bc589cd1354bd125c35aca993dd0c9fe9e4460dfed73501116459R177 Best, Kezhu Wang On February 28, 2021 at 21:23:31, Till Rohrmann (trohrm...@apache.org) wrote: I think you are right with the problem of endOfInput. endOfInput should not be used to commit final results. In fact if this termination fails then we might end up in a different outcome of the job which is equally valid as the one before the failure. Concerning unaligned checkpoints, I think they don't play well together with draining a streaming pipeline. The problem is that in the draining case you want to process all records which are still in flight but unaligned checkpoints don't guarantee this as they can jump in flight records. I think the MAX_WATERMARK event should be sent either just before or with the EndOfPartitionEvent. If we ensure that the MAX_WATERMARK is only persisted in state if a recovery will trigger immediately the shut down of this operator, then it shouldn't be an issue. Cheers, Till On Sun, Feb 28, 2021 at 9:06 AM Kezhu Wang wrote: > Hi Till, > > Just for bookkeeping, some observations from current implementation. > > > With this model, the final checkpoint is quite simple because it is > ingrained in the lifecycle of an operator. Differently said an operator > will only terminate after it has committed its side effects and seen the > notifyCheckpointComplete message (if it is stateful). > > Currently, we could not mark this operator(or subtask) as terminated since > result of `notifyCheckpointComplete`(possible side effect committing) is > not taken into account of the belonging checkpoint. The job has to run to > next safe point(finished or next checkpoint success) to be marked as > “terminated”. > > > How to implement the stop-with-savepoint --drain/terminate command with > this model: One idea could be to tell the sources that they should stop > reading. This should trigger the EndOfPartitionEvent to be sent > downstream. > This will transition all operators into the TERMINATING state. > > Currently, EndOfPartitionEvent is only issued after StreamTask.invoke > returned. To achieve above, possible works should be required: > * Promote `EndOfPartitionEvent` from `Task` to `StreamTask`. This may > have some interferences with BatchTask or network io stack. > * Or introducing stream task level `EndOfUserRecordsEvent`(from PR#14831 > @Yun @Piotr) > * Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`. > > Besides this, I would like to quote some discussion from FLINK-21467 > between @Piotr and me: > > From @Piotr > > Note, that it's not only that endOfInput can be called multiple times. > There is a very remote possibility that the following scenario will happen: > 1. checkpoint is taken (successfully) > 2. sources are finishing > 3. endOfInput is issued > 4. job fails > 5. job restarts to checkpoint 1. > 6. after failover, because of some non deterministic logic in the source, > sources are not finishing > > From me > > But I think there is little work Flink can do to cope with this kind of > issues. The checkpoint could be a savepoint triggered from user side and > the "non deterministic logic" could be a change from user(eg. changing of > stoppingOffsets in KafkaSource). > > > I think the "non deterministic logic" could cause trouble in combination > with unaligned checkpoint and downstream window operator. Unaligned > checkpoint will persist "MAX_WATERMARK" in state, after restarting, > "MAX_WATERMARK" will cause downstream window operator ignores all future > inputs. > > FLIP-147 demands no new records from end-of-stream-flushing, but source > will emit “MAX_WATERMARK” on ending. Previously, I thought it is not a > valid issue, but turn out that it could cause trouble
[jira] [Created] (FLINK-21523) ArrayIndexOutOfBoundsException occurs while run a hive streaming source job with partitioned table source
zouyunhe created FLINK-21523: Summary: ArrayIndexOutOfBoundsException occurs while run a hive streaming source job with partitioned table source Key: FLINK-21523 URL: https://issues.apache.org/jira/browse/FLINK-21523 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.12.1 Reporter: zouyunhe we have two hive table, the ddl as below {code:java} //test_tbl5 test.test_5create table test.test_5 (dpi int, uid bigint) partitioned by( day string, hour string) stored as parquet; //test_tbl3 test.test_3create table test.test_3( dpi int, uid bigint, itime timestamp) stored as parquet;{code} then add a partiton to test_tbl5, {code:java} alter table test_tbl5 add partition(day='2021-02-27',hour='12'); {code} we start a flink streaming job to read hive table test_tbl5 , and write the data into test_tbl3, the job's sql as {code:java} set test_tbl5.streaming-source.enable = true; insert into hive.test.test_tbl3 select dpi, uid, cast(to_timestamp('2020-08-09 00:00:00') as timestamp(9)) from hive.test.test_tbl5 where `day` = '2021-02-27'; {code} and we seen the exception throws {code:java} 2021-02-28 22:33:16,553 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext - Exception while handling result from async call in SourceCoordinator-Source: HiveSource-test.test_tbl5. Triggering job failover.org.apache.flink.connectors.hive.FlinkHiveException: Failed to enumerate filesat org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator.handleNewSplits(ContinuousHiveSplitEnumerator.java:152) ~[flink-connector-hive_2.12-1.12.1.jar:1.12.1]at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:136) ~[flink-dist_2.12-1.12.1.jar:1.12.1]at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-dist_2.12-1.12.1.jar:1.12.1]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_60]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_60]at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]Caused by: java.lang.ArrayIndexOutOfBoundsException: -1at org.apache.flink.connectors.hive.util.HivePartitionUtils.toHiveTablePartition(HivePartitionUtils.java:184) ~[flink-connector-hive_2.12-1.12.1.jar:1.12.1]at org.apache.flink.connectors.hive.HiveTableSource$HiveContinuousPartitionFetcherContext.toHiveTablePartition(HiveTableSource.java:417) ~[flink-connector-hive_2.12-1.12.1.jar:1.12.1]at org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator$PartitionMonitor.call(ContinuousHiveSplitEnumerator.java:237) ~[flink-connector-hive_2.12-1.12.1.jar:1.12.1]at org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator$PartitionMonitor.call(ContinuousHiveSplitEnumerator.java:177) ~[flink-connector-hive_2.12-1.12.1.jar:1.12.1]at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:133) ~[flink-dist_2.12-1.12.1.jar:1.12.1]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_60]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_60] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_60]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_60]... 3 more{code} it seems the partitoned field is not found in the source table field list. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] Release 1.12.2, release candidate #2
Hey Roman, Thank you very much for preparing RC2. +1 from my side. 1. Verified Checksums and GPG signatures. 2. Verified that the source archives do not contain any binaries. 3. Successfully Built the source with Maven. 4. Started a local Flink cluster, ran the streaming WordCount example with WebUI, checked the output and JM/TM log, no suspicious output/log. 5. Repeat Step 4 with the binary release as well, no suspicious output/log. 6. Checked for source and binary release to make sure both an Apache License file and a NOTICE file are included. 7. Manually verified that no pom file changes between 1.12.2-rc1 and 1.12.2-rc2; no obvious license problem. 8. Review the release PR for RC2 updates, and double confirmed the change-list for 1.12.2. Best, Yuan On Sat, Feb 27, 2021 at 7:19 AM Roman Khachatryan wrote: > Hi everyone, > > Please review and vote on the release candidate #1 for the version 1.12.2, > as follows: > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > > > The complete staging area is available for your review, which includes: > * JIRA release notes [1], > * the official Apache source release and binary convenience releases to be > deployed to dist.apache.org [2], which are signed with the key with > fingerprint 0D545F264D2DFDEBFD4E038F97B4625E2FCF517C [3], > * all artifacts to be deployed to the Maven Central Repository [4], > * source code tag "release-1.12.2-rc2" [5], > * website pull request listing the new release and adding announcement blog > post [6]. > > The vote will be open for at least 72 hours. It is adopted by majority > approval, with at least 3 PMC affirmative votes. > > [1] > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12349502=12315522 > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.2-rc2/ > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > [4] > https://repository.apache.org/content/repositories/orgapacheflink-1414/ > [5] https://github.com/apache/flink/releases/tag/release-1.12.2-rc2 > [6] https://github.com/apache/flink-web/pull/418 > > Regards, > Roman >
Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
I think you are right with the problem of endOfInput. endOfInput should not be used to commit final results. In fact if this termination fails then we might end up in a different outcome of the job which is equally valid as the one before the failure. Concerning unaligned checkpoints, I think they don't play well together with draining a streaming pipeline. The problem is that in the draining case you want to process all records which are still in flight but unaligned checkpoints don't guarantee this as they can jump in flight records. I think the MAX_WATERMARK event should be sent either just before or with the EndOfPartitionEvent. If we ensure that the MAX_WATERMARK is only persisted in state if a recovery will trigger immediately the shut down of this operator, then it shouldn't be an issue. Cheers, Till On Sun, Feb 28, 2021 at 9:06 AM Kezhu Wang wrote: > Hi Till, > > Just for bookkeeping, some observations from current implementation. > > > With this model, the final checkpoint is quite simple because it is > ingrained in the lifecycle of an operator. Differently said an operator > will only terminate after it has committed its side effects and seen the > notifyCheckpointComplete message (if it is stateful). > > Currently, we could not mark this operator(or subtask) as terminated since > result of `notifyCheckpointComplete`(possible side effect committing) is > not taken into account of the belonging checkpoint. The job has to run to > next safe point(finished or next checkpoint success) to be marked as > “terminated”. > > > How to implement the stop-with-savepoint --drain/terminate command with > this model: One idea could be to tell the sources that they should stop > reading. This should trigger the EndOfPartitionEvent to be sent > downstream. > This will transition all operators into the TERMINATING state. > > Currently, EndOfPartitionEvent is only issued after StreamTask.invoke > returned. To achieve above, possible works should be required: > * Promote `EndOfPartitionEvent` from `Task` to `StreamTask`. This may > have some interferences with BatchTask or network io stack. > * Or introducing stream task level `EndOfUserRecordsEvent`(from PR#14831 > @Yun @Piotr) > * Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`. > > Besides this, I would like to quote some discussion from FLINK-21467 > between @Piotr and me: > > From @Piotr > > Note, that it's not only that endOfInput can be called multiple times. > There is a very remote possibility that the following scenario will happen: > 1. checkpoint is taken (successfully) > 2. sources are finishing > 3. endOfInput is issued > 4. job fails > 5. job restarts to checkpoint 1. > 6. after failover, because of some non deterministic logic in the source, > sources are not finishing > > From me > > But I think there is little work Flink can do to cope with this kind of > issues. The checkpoint could be a savepoint triggered from user side and > the "non deterministic logic" could be a change from user(eg. changing of > stoppingOffsets in KafkaSource). > > > I think the "non deterministic logic" could cause trouble in combination > with unaligned checkpoint and downstream window operator. Unaligned > checkpoint will persist "MAX_WATERMARK" in state, after restarting, > "MAX_WATERMARK" will cause downstream window operator ignores all future > inputs. > > FLIP-147 demands no new records from end-of-stream-flushing, but source > will emit “MAX_WATERMARK” on ending. Previously, I thought it is not a > valid issue, but turn out that it could cause trouble under scenario listed > by @Piotr if I am not wrong. > > > PR#14831: https://github.com/apache/flink/pull/14831 > FLINK-21467: https://issues.apache.org/jira/browse/FLINK-21467 > > > Best, > Kezhu Wang > > On February 27, 2021 at 18:12:20, Till Rohrmann (trohrm...@apache.org) > wrote: > > Thanks for all your thoughts. I think we should further think through > whether to allow checkpoints after an operator has emitted all its records > (e.g. after close is called currently) or not. I think by doing this we > would nicely decouple the checkpoint taking from the operator lifecycle > and > wouldn't need special checkpoints/savepoints for the final checkpoint and > stop-with-savepoint --drain. Let me try to explain this a bit more > detailed. > > If we say an operator has the rough lifecycle RUNNING => TERMINATING => > TERMINATED where we go from RUNNING into TERMINATING after we have seen > the > EndOfPartitionEvent and flushed all our records. The operator goes from > TERMINATING => TERMINATED if it has persisted all its possible side > effects. Throughout all states, it is possible to trigger a checkpoint. A > stateless operator will immediately go from TERMINATING to TERMINATED > whereas a stateful operator would wait for another checkpoint to be > triggered and successfully completed (notifyCheckpointComplete). > > With this model, the final checkpoint is quite simple because it is > ingrained in
[jira] [Created] (FLINK-21522) Iterative stream could not work with stop-with-savepoint
Kezhu Wang created FLINK-21522: -- Summary: Iterative stream could not work with stop-with-savepoint Key: FLINK-21522 URL: https://issues.apache.org/jira/browse/FLINK-21522 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.12.1, 1.11.3, 1.13.0 Reporter: Kezhu Wang User reports this in user mail list: [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-functions-2-2-and-stop-with-savepoint-td41772.html] I copied the full mail body here: {quote} I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code: val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...) return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE)) Checkpointing and taking a savepoint without stopping work as expected. However, when I run "flink stop " or even "flink stop --drain ", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException. In the "Checkpoint History" it shows only 2 of my 3 operators completed their work: Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B I've been unable to gain any insights from logs so far. Thoughts? {quote} I think it is what we overlooked in evaluation of [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033], FLINK-21132 and FLINK-21133. I think the problem is two folds in current implementation: # {{StreamIterationHead}} does not finish itself. # There is a local feedback from {{StreamIterationTail}} to {{StreamIterationHead}} which could cause {{StreamIterationTail}} blocking after {{StreamIterationHead}} finished . Globally speaking, it is a loop. [~pnowojski] emphasized this in FLINK-21132 and FLINK-21133. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Till, Just for bookkeeping, some observations from current implementation. > With this model, the final checkpoint is quite simple because it is ingrained in the lifecycle of an operator. Differently said an operator will only terminate after it has committed its side effects and seen the notifyCheckpointComplete message (if it is stateful). Currently, we could not mark this operator(or subtask) as terminated since result of `notifyCheckpointComplete`(possible side effect committing) is not taken into account of the belonging checkpoint. The job has to run to next safe point(finished or next checkpoint success) to be marked as “terminated”. > How to implement the stop-with-savepoint --drain/terminate command with this model: One idea could be to tell the sources that they should stop reading. This should trigger the EndOfPartitionEvent to be sent downstream. This will transition all operators into the TERMINATING state. Currently, EndOfPartitionEvent is only issued after StreamTask.invoke returned. To achieve above, possible works should be required: * Promote `EndOfPartitionEvent` from `Task` to `StreamTask`. This may have some interferences with BatchTask or network io stack. * Or introducing stream task level `EndOfUserRecordsEvent`(from PR#14831 @Yun @Piotr) * Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`. Besides this, I would like to quote some discussion from FLINK-21467 between @Piotr and me: >From @Piotr > Note, that it's not only that endOfInput can be called multiple times. There is a very remote possibility that the following scenario will happen: 1. checkpoint is taken (successfully) 2. sources are finishing 3. endOfInput is issued 4. job fails 5. job restarts to checkpoint 1. 6. after failover, because of some non deterministic logic in the source, sources are not finishing >From me > But I think there is little work Flink can do to cope with this kind of issues. The checkpoint could be a savepoint triggered from user side and the "non deterministic logic" could be a change from user(eg. changing of stoppingOffsets in KafkaSource). > I think the "non deterministic logic" could cause trouble in combination with unaligned checkpoint and downstream window operator. Unaligned checkpoint will persist "MAX_WATERMARK" in state, after restarting, "MAX_WATERMARK" will cause downstream window operator ignores all future inputs. FLIP-147 demands no new records from end-of-stream-flushing, but source will emit “MAX_WATERMARK” on ending. Previously, I thought it is not a valid issue, but turn out that it could cause trouble under scenario listed by @Piotr if I am not wrong. PR#14831: https://github.com/apache/flink/pull/14831 FLINK-21467: https://issues.apache.org/jira/browse/FLINK-21467 Best, Kezhu Wang On February 27, 2021 at 18:12:20, Till Rohrmann (trohrm...@apache.org) wrote: Thanks for all your thoughts. I think we should further think through whether to allow checkpoints after an operator has emitted all its records (e.g. after close is called currently) or not. I think by doing this we would nicely decouple the checkpoint taking from the operator lifecycle and wouldn't need special checkpoints/savepoints for the final checkpoint and stop-with-savepoint --drain. Let me try to explain this a bit more detailed. If we say an operator has the rough lifecycle RUNNING => TERMINATING => TERMINATED where we go from RUNNING into TERMINATING after we have seen the EndOfPartitionEvent and flushed all our records. The operator goes from TERMINATING => TERMINATED if it has persisted all its possible side effects. Throughout all states, it is possible to trigger a checkpoint. A stateless operator will immediately go from TERMINATING to TERMINATED whereas a stateful operator would wait for another checkpoint to be triggered and successfully completed (notifyCheckpointComplete). With this model, the final checkpoint is quite simple because it is ingrained in the lifecycle of an operator. Differently said an operator will only terminate after it has committed its side effects and seen the notifyCheckpointComplete message (if it is stateful). Here it is important to note that in the streaming case, different bounded operators can terminate at different times. They don't have to terminate all with the same checkpoint. How to implement the stop-with-savepoint --drain/terminate command with this model: One idea could be to tell the sources that they should stop reading. This should trigger the EndOfPartitionEvent to be sent downstream. This will transition all operators into the TERMINATING state. Next the JM can trigger a checkpoint to shut the operators down and not to wait for the next automatic checkpoint trigger event. By allowing checkpoints throughout the entire lifecycle of an operator we disallow sending records from notifyCheckpointComplete because this message will also be sent in the state TERMINATING where an operator has already produced all of its records. What do