Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-28 Thread Timo Walther
and btw it is interesting to notice that AWS seems to do the approach 
that I suggested first.


All functions are SQL standard compliant, and only dedicated functions 
with a prefix such as CURRENT_ROW_TIMESTAMP divert from the standard.


Regards,
Timo

On 01.03.21 08:45, Timo Walther wrote:
How about we simply go for your first approach by having [query-start, 
row, auto] as configuration parameters where [auto] is the default?


This sounds like a good consensus where everyone is happy, no?

This also allows user to restore the old per-row behavior for all 
functions that we had before Flink 1.13.


Regards,
Timo


On 26.02.21 11:10, Leonard Xu wrote:

Thanks Joe for the great investigation.


• Generally urging for semantics (batch > time of first query 
issued, streaming > row level).

I discussed the thing now with Timo & Stephan:
• It seems to go towards a config parameter, either [query-start, 
row]  or [query-start, row, auto] and what is the default?
• The main question seems to be: are we pushing the default 
towards streaming. (probably related the insert into behaviour in the 
sql client).



It looks like opinions in this thread and user inputs agreed that: 
batch should use time of first query, streaming should use row level.
Based on these, we should keep row level for streaming and query start 
for batch just like the config parameter value [auto].


Currently Flink keeps row level for time function in both batch and 
streaming job, thus we only need to update the behavior in batch.


I tend to not expose an obscure configuration to users especially it 
is semantics-related.


1.We can make [auto] as a default agreement,for current Flink 
streaming users,they feel nothing has changed,for current Flink 
batch users,they feel Flink batch is corrected to other good batch 
engines as well as SQL standard. We can also provide a function 
CURRENT_ROW_TIMESTAMP[1] for Flink batch users who want row level time 
function.


2. CURRENT_ROW_TIMESTAMP can also be used in Flink streaming, it has 
clear semantics, we can encourage users to use it.


In this way, We don’t have to introduce an obscure configuration 
prematurely while making all users happy


How do you think?

Best,
Leonard
[1] 
https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-current-row-timestamp.html 






Hope this helps,

Thanks,
Joe


On 19.02.2021, at 10:25, Leonard Xu  wrote:

Hi, Joe

Thanks for volunteering to investigate the user data on this topic. 
Do you

have any progress here?

Thanks,
Leonard

On Thu, Feb 4, 2021 at 3:08 PM Johannes Moser 
 wrote:



Hello,

I will work with some users to get data on that.

Thanks, Joe


On 03.02.2021, at 14:58, Stephan Ewen  wrote:

Hi all!

A quick thought on this thread: We see a typical stalemate here, 
as in so

many discussions recently.
One developer prefers it this way, another one another way. Both have
pro/con arguments, it takes a lot of time from everyone, still 
there is

little progress in the discussion.

Ultimately, this can only be decided by talking to the users. And it
would also be the best way to ensure that what we build is the 
intuitive

and expected way for users.
The less the users are into the deep aspects of Flink SQL, the better

they

can mirror what a common user would expect (a power user will anyways
figure it out).
Let's find a person to drive that, spell it out in the FLIP as 
"semantics
TBD", and focus on the implementation of the parts that are agreed 
upon.


For interviewing the users, here are some ideas for questions to 
look at:

- How do they view the trade-off between stable semantics vs.
out-of-the-box magic (faster getting started).
- How comfortable are they realizing the different meaning of 
"now()" in

a streaming versus batch context.
- What would be their expectation when moving a query with the time
functions ("now()") from an unbounded stream (Kafka source without 
end
offset) to a bounded stream (Kafka source with end offsets), which 
may

switch execution to batch.

Best,
Stephan


On Tue, Feb 2, 2021 at 3:19 PM Jark Wu  wrote:


Hi Fabian,

I think we have an agreement that the functions should be 
evaluated at

query start in batch mode.
Because all the other batch systems and traditional databases are 
this

behavior, which is standard SQL compliant.

*1. The different point of view is what's the behavior in streaming

mode? *


 From my point of view, I don't see any potential meaning to 
evaluate at

query-start for a 365-day long running streaming job.
And from my observation, CURRENT_TIMESTAMP is heavily used by Flink
streaming users and they expect the current behaviors.
The SQL standard only provides a guideline for traditional batch

systems,

however Flink is a leading streaming processing system
which is out of the scope of SQL standard, and Flink should 
define the
streaming standard. I think a standard should follow users' 
intuition.
Therefore, I think we don't need to be standard 

Re: [DISCUSS] Apache Flink Jira Process

2021-02-28 Thread Xintong Song
Thanks for driving this discussion, Konstantin.

I like the idea of having a bot reminding reporter/assignee/watchers about
inactive tickets and if needed downgrade/close them automatically.

My two cents:
We may have labels like "downgraded-by-bot" / "closed-by-bot", so that it's
easier to filter and review tickets updated by the bot.
We may want to review such tickets (e.g., monthly) in case a valid ticket
failed to draw the attention of relevant committers and the reporter
doesn't know who to ping.

Thank you~

Xintong Song



On Sat, Feb 27, 2021 at 1:37 AM Till Rohrmann  wrote:

> Thanks for starting this discussion Konstantin. I like your proposal and
> also the idea of automating the tedious parts of it via a bot.
>
> Cheers,
> Till
>
> On Fri, Feb 26, 2021 at 4:17 PM Konstantin Knauf 
> wrote:
>
> > Dear Flink Community,
> >
> > I would like to start a discussion on improving and to some extent simply
> > defining the way we work with Jira. Some aspects have been discussed a
> > while back [1], but I would like to go a bit beyond that with the
> following
> > goals in mind:
> >
> >
> >-
> >
> >clearer communication and expectation management with the community
> >-
> >
> >   a user or contributor should be able to judge the urgency of a
> ticket
> >   by its priority
> >   -
> >
> >   if a ticket is assigned to someone the expectation that someone is
> >   working on it should hold
> >   -
> >
> >generally reduce noise in Jira
> >-
> >
> >reduce overhead of committers to ask about status updates of
> >contributions or bug reports
> >-
> >
> >   “Are you still working on this?”
> >   -
> >
> >   “Are you still interested in this?”
> >   -
> >
> >   “Does this still happen on Flink 1.x?”
> >   -
> >
> >   “Are you still experiencing this issue?”
> >   -
> >
> >   “What is the status of the implementation”?
> >   -
> >
> >while still encouraging users to add new tickets and to leave feedback
> >about existing tickets
> >
> >
> > Please see the full proposal here:
> >
> >
> https://docs.google.com/document/d/19VmykDSn4BHgsCNTXtN89R7xea8e3cUIl-uivW8L6W8/edit#
> > .
> >
> > The idea would be to discuss this proposal in this thread. If we come to
> a
> > conclusion, I'd document the proposal in the wiki [2] and we would then
> > vote on it (approval by "Lazy Majority").
> >
> > Cheers,
> >
> > Konstantin
> >
> > [1]
> >
> >
> https://lists.apache.org/thread.html/rd34fb695d371c2bf0cbd1696ce190bac35dd78f29edd8c60d0c7ee71%40%3Cdev.flink.apache.org%3E
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLINK+Jira+field+definitions
> >
> > --
> >
> > Konstantin Knauf
> >
> > https://twitter.com/snntrable
> >
> > https://github.com/knaufk
> >
>


Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-28 Thread Timo Walther
How about we simply go for your first approach by having [query-start, 
row, auto] as configuration parameters where [auto] is the default?


This sounds like a good consensus where everyone is happy, no?

This also allows user to restore the old per-row behavior for all 
functions that we had before Flink 1.13.


Regards,
Timo


On 26.02.21 11:10, Leonard Xu wrote:

Thanks Joe for the great investigation.



• Generally urging for semantics (batch > time of first query issued, 
streaming > row level).
I discussed the thing now with Timo & Stephan:
• It seems to go towards a config parameter, either [query-start, row]  
or [query-start, row, auto] and what is the default?
• The main question seems to be: are we pushing the default towards 
streaming. (probably related the insert into behaviour in the sql client).



It looks like opinions in this thread and user inputs agreed that: batch should 
use time of first query, streaming should use row level.
Based on these, we should keep row level for streaming and query start for 
batch just like the config parameter value [auto].

Currently Flink keeps row level for time function in both batch and streaming 
job, thus we only need to update the behavior in batch.

I tend to not expose an obscure configuration to users especially it is 
semantics-related.

1.We can make [auto] as a default agreement,for current Flink streaming 
users,they feel nothing has changed,for current Flink batch users,they feel 
Flink batch is corrected to other good batch engines as well as SQL standard. 
We can also provide a function CURRENT_ROW_TIMESTAMP[1] for Flink batch users 
who want row level time function.

2. CURRENT_ROW_TIMESTAMP can also be used in Flink streaming, it has clear 
semantics, we can encourage users to use it.

In this way, We don’t have to introduce an obscure configuration prematurely 
while making all users happy

How do you think?

Best,
Leonard
[1] 
https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-current-row-timestamp.html




Hope this helps,

Thanks,
Joe


On 19.02.2021, at 10:25, Leonard Xu  wrote:

Hi, Joe

Thanks for volunteering to investigate the user data on this topic. Do you
have any progress here?

Thanks,
Leonard

On Thu, Feb 4, 2021 at 3:08 PM Johannes Moser  wrote:


Hello,

I will work with some users to get data on that.

Thanks, Joe


On 03.02.2021, at 14:58, Stephan Ewen  wrote:

Hi all!

A quick thought on this thread: We see a typical stalemate here, as in so
many discussions recently.
One developer prefers it this way, another one another way. Both have
pro/con arguments, it takes a lot of time from everyone, still there is
little progress in the discussion.

Ultimately, this can only be decided by talking to the users. And it
would also be the best way to ensure that what we build is the intuitive
and expected way for users.
The less the users are into the deep aspects of Flink SQL, the better

they

can mirror what a common user would expect (a power user will anyways
figure it out).
Let's find a person to drive that, spell it out in the FLIP as "semantics
TBD", and focus on the implementation of the parts that are agreed upon.

For interviewing the users, here are some ideas for questions to look at:
- How do they view the trade-off between stable semantics vs.
out-of-the-box magic (faster getting started).
- How comfortable are they realizing the different meaning of "now()" in
a streaming versus batch context.
- What would be their expectation when moving a query with the time
functions ("now()") from an unbounded stream (Kafka source without end
offset) to a bounded stream (Kafka source with end offsets), which may
switch execution to batch.

Best,
Stephan


On Tue, Feb 2, 2021 at 3:19 PM Jark Wu  wrote:


Hi Fabian,

I think we have an agreement that the functions should be evaluated at
query start in batch mode.
Because all the other batch systems and traditional databases are this
behavior, which is standard SQL compliant.

*1. The different point of view is what's the behavior in streaming

mode? *


 From my point of view, I don't see any potential meaning to evaluate at
query-start for a 365-day long running streaming job.
And from my observation, CURRENT_TIMESTAMP is heavily used by Flink
streaming users and they expect the current behaviors.
The SQL standard only provides a guideline for traditional batch

systems,

however Flink is a leading streaming processing system
which is out of the scope of SQL standard, and Flink should define the
streaming standard. I think a standard should follow users' intuition.
Therefore, I think we don't need to be standard SQL compliant at this

point

because users don't expect it.
Changing the behavior of the functions to evaluate at query start for
streaming mode will hurt most of Flink SQL users and we have nothing to
gain,
we should avoid this.

*2. Does it break the unified streaming-batch semantics? *

I don't think so. First of 

Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-28 Thread Timo Walther
We could also think about reading this config option in Table API. The 
effect would be to call `await()` directly in an execute call. I could 
also imagine this to be useful esp. when you fire a lot of insert into 
queries. We had the case before that users where confused that the 
execution happens asynchronously, such an option could prevent this to 
happen again.


Regards,
Timo

On 01.03.21 05:14, Kurt Young wrote:

I also asked some users about their opinion that if we introduce some
config prefixed with "table" but doesn't
have affection with methods in Table API and SQL. All of them are kind of
shocked by such question, asking
why we would do anything like this.

This kind of reaction actually doesn't surprise me a lot, so I jumped in
and challenged this config option even
after the FLIP had already been accepted.

If we only have to define the execution behavior for multiple statements in
SQL client, we should only introduce
a config option which would tell users it's affection scope by its name.
Prefixing with "table" is definitely not a good
idea here.

Best,
Kurt


On Fri, Feb 26, 2021 at 9:39 PM Leonard Xu  wrote:


Hi, all

Look like there’s only one divergence about option [ table | sql-client
].dml-sync in this thread, correct me if I’m wrong.

1. Leaving the context of this thread, from a user's perspective,
the table.xx configurations should take effect in Table API & SQL,
the sql-client.xx configurations should only take effect in sql-client.
  In my(the user's) opinion, other explanations are counterintuitive.

2.  It should be pointed out that both all existed table.xx configurations
like table.exec.state.ttl, table.optimizer.agg-phase-strategy,
table.local-time-zone,etc..  and the proposed sql-client.xx configurations
like sql-client.verbose, sql-client.execution.max-table-result.rows
comply with this convention.

3. Considering the portability to support different CLI tools (sql-client,
sql-gateway, etc.), I prefer table.dml-sync.

In addition, I think sql-client/sql-gateway/other CLI tools can be placed
out of flink-table module even in an external project, this should not
affect our conclusion.


Hope this can help you.


Best,
Leonard




在 2021年2月25日,18:51,Shengkai Fang  写道:

Hi, everyone.

I do some summaries about the discussion about the option. If the summary
has errors, please correct me.

`table.dml-sync`:
- take effect for `executeMultiSql` and sql client
- benefit: SQL script portability. One script for all platforms.
- drawback: Don't work for `TableEnvironment#executeSql`.

`table.multi-dml-sync`:
- take effect for `executeMultiSql` and sql client
- benefit: SQL script portability
- drawback: It's confused when the sql script has one dml statement but
need to set option `table.multi-dml-sync`

`client.dml-sync`:
- take effect for sql client only
- benefit: clear definition.
- drawback: Every platform needs to define its own option. Bad SQL script
portability.

Just as Jark said, I think the `table.dml-sync` is a good choice if we

can

extend its scope and make this option works for `executeSql`.
It's straightforward and users can use this option now in table api.  The
drawback is the  `TableResult#await` plays the same role as the option.

I

don't think the drawback is really critical because many systems have
commands play the same role with the different names.

Best,
Shengkai

Timo Walther  于2021年2月25日周四 下午4:23写道:


The `table.` prefix is meant to be a general option in the table
ecosystem. Not necessarily attached to Table API or SQL Client. That's
why SQL Client is also located in the `flink-table` module.

My main concern is the SQL script portability. Declaring the sync/async
behavior will happen in many SQL scripts. And users should be easily
switch from SQL Client to some commercial product without the need of
changing the script again.

Sure, we can change from `sql-client.dml-sync` to `table.dml-sync` later
but that would mean introducing future confusion. An app name (what
`sql-client` kind of is) should not be part of a config option key if
other apps will need the same kind of option.

Regards,
Timo


On 24.02.21 08:59, Jark Wu wrote:

 From my point of view, I also prefer "sql-client.dml-sync",

because the behavior of this configuration is very clear.
Even if we introduce a new config in the future, e.g. `table.dml-sync`,
we can also deprecate the sql-client one.

Introducing a "table."  configuration without any implementation
will confuse users a lot, as they expect it should take effect on
the Table API.

If we want to introduce an unified "table.dml-sync" option, I prefer
it should be implemented on Table API and affect all the DMLs on
Table API (`tEnv.executeSql`, `Table.executeInsert`, `StatementSet`),
as I have mentioned before [1].


It would be very straightforward that it affects all the DMLs on SQL

CLI

and
TableEnvironment (including `executeSql`, `StatementSet`,
`Table#executeInsert`, etc.).
This can also make SQL CLI easy to support 

[jira] [Created] (FLINK-21532) Make CatalogTableImpl#toProperties and CatalogTableImpl#fromProperties case sensitive

2021-02-28 Thread Lsw_aka_laplace (Jira)
Lsw_aka_laplace created FLINK-21532:
---

 Summary: Make CatalogTableImpl#toProperties and 
CatalogTableImpl#fromProperties case sensitive
 Key: FLINK-21532
 URL: https://issues.apache.org/jira/browse/FLINK-21532
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Lsw_aka_laplace






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Yun Gao
(Sorry that I repeat this mail since the last one is not added into the same 
mail list thread,
very sorry for the inconvenience)

 Hi all,

Very thanks for all the deep thoughts!

> How to implement the stop-with-savepoint --drain/terminate command with
> this model: One idea could be to tell the sources that they should stop
> reading. This should trigger the EndOfPartitionEvent to be sent
> downstream.
> This will transition all operators into the TERMINATING state.
>
> Currently, EndOfPartitionEvent is only issued after StreamTask.invoke
> returned. To achieve above, possible works should be required:
> * Promote  `EndOfPartitionEvent` from `Task` to `StreamTask`. This may
> have some interferences with BatchTask or network io stack.
> * Or introducing stream task level `EndOfUserRecordsEvent`(from PR#14831
> @Yun @Piotr)
> * Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`.

I also have similar concern with Kezhu for the issue whether do we need to 
introduce a new message
to notify the operators to endOfInput/close ? The main concerns of reusing that 
EndOfPartitionEvent
is that
1. The EndOfPartitionEvent is currently emitted in Task instead of StreamTask, 
we would need some
refactors here.
2. Currently the InputGate/InputChannel would be released after the downstream 
tasks have received 
EndOfPartitionEvent from all the input channels, this would makes the following 
checkpoint unable to 
perform since we could not emit barriers to downstream tasks ?

Regarding the MAX_WATERMARK, I still not fully understand the issue since it 
seems to me
that now Flink won't snapshot the watermark now? If the job failover, the 
window operator
would reload all the pending windows before flushed by MAX_WATERMARK and when 
the
job finish again, it would re-emit the MAX_WATERMARK?

Best,
Yun



--
From:Kezhu Wang 
Send Time:2021 Mar. 1 (Mon.) 01:26
To:Till Rohrmann 
Cc:Piotr Nowojski ; Guowei Ma ; 
dev ; Yun Gao ; 
jingsongl...@gmail.com 
Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

In “stop-with-savepoint —drain”, MAX_WATERMARK is not an issue. For normal
finishing task, not allowing unaligned checkpoint does not solve the
problem as MAX_WATERMARK could be persisted in downstream task. When
scenario @Piotr depicted occurs, downstream(or further downstream) window
operator will count all inputs as late.

> If we ensure that the MAX_WATERMARK is only persisted in state if a
recovery will trigger immediately the shut down of this operator, then it
shouldn't be an issue.

You are right in case the assumption holds, I have same thought as you
before. But I am kind of worry about whether it is too prefect to be
fragile. This requires strong guarantee from implementation that a recovery
from TERMINATING stage should go directly to that stage.

> I think the MAX_WATERMARK event should be sent either just before or with
the EndOfPartitionEvent.

I prefer “with the EndOfPartitionEvent”. EndOfPartitionEvent itself already
carry what ending MAX_WATERMARK try to express. May be we can reuse it ?
@Piotr

A preview work[1] from @Yun in PR#14831 explains EndOfPartitionEvent as
checkpoint barrier if there are pending checkpoints.


[1]:
https://github.com/gaoyunhaii/flink/commit/1aaa80fb0afad13a314b06783c61c01b9414f91b#diff-d4568b34060bc589cd1354bd125c35aca993dd0c9fe9e4460dfed73501116459R177


Best,
Kezhu Wang

On February 28, 2021 at 21:23:31, Till Rohrmann (trohrm...@apache.org)
wrote:

I think you are right with the problem of endOfInput. endOfInput should not
be used to commit final results. In fact if this termination fails then we
might end up in a different outcome of the job which is equally valid as
the one before the failure.

Concerning unaligned checkpoints, I think they don't play well together
with draining a streaming pipeline. The problem is that in the draining
case you want to process all records which are still in flight but
unaligned checkpoints don't guarantee this as they can jump in flight
records.

I think the MAX_WATERMARK event should be sent either just before or with
the EndOfPartitionEvent. If we ensure that the MAX_WATERMARK is only
persisted in state if a recovery will trigger immediately the shut down of
this operator, then it shouldn't be an issue.

Cheers,
Till

On Sun, Feb 28, 2021 at 9:06 AM Kezhu Wang  wrote:

> Hi Till,
>
> Just for bookkeeping, some observations from current implementation.
>
> > With this model, the final checkpoint is quite simple because it is
> ingrained in the lifecycle of an operator. Differently said an operator
> will only terminate after it has committed its side effects and seen the
> notifyCheckpointComplete message (if it is stateful).
>
> Currently, we could not mark this operator(or subtask) as terminated since
> result of `notifyCheckpointComplete`(possible side effect committing) is
> not taken into account of the belonging checkpoint. The job 

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Yun Gao
  Hi all

Very thanks for all the deep thoughts!

> How to implement the stop-with-savepoint --drain/terminate command with
> this model: One idea could be to tell the sources that they should stop
> reading. This should trigger the EndOfPartitionEvent to be sent
> downstream.
> This will transition all operators into the TERMINATING state.
>
> Currently, EndOfPartitionEvent is only issued after StreamTask.invoke
> returned. To achieve above, possible works should be required:
> * Promote  `EndOfPartitionEvent` from `Task` to `StreamTask`. This may
> have some interferences with BatchTask or network io stack.
> * Or introducing stream task level `EndOfUserRecordsEvent`(from PR#14831
> @Yun @Piotr)
> * Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`.

I also have similar concern with Kezhu for the issue whether do we need to 
introduce a new message
to notify the operators to endOfInput/close ? The main concerns of reusing that 
EndOfPartitionEvent
is that
1. The EndOfPartitionEvent is currently emitted in Task instead of StreamTask, 
we would need some
refactors here.
2. Currently the InputGate/InputChannel would be released after the downstream 
tasks have received 
EndOfPartitionEvent from all the input channels, this would makes the following 
checkpoint unable to 
perform since we could not emit barriers to downstream tasks ?

Regarding the MAX_WATERMARK, I still not fully understand the issue since it 
seems to me
that now Flink won't snapshot the watermark now? If the job failover, the 
window operator
would reload all the pending windows before flushed by MAX_WATERMARK and when 
the
job finish again, it would re-emit the MAX_WATERMARK?

Best,
Yun



--
From:Kezhu Wang 
Send Time:2021 Mar. 1 (Mon.) 01:26
To:Till Rohrmann 
Cc:Piotr Nowojski ; Guowei Ma ; 
dev ; Yun Gao ; 
jingsongl...@gmail.com 
Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

In “stop-with-savepoint —drain”, MAX_WATERMARK is not an issue. For normal
finishing task, not allowing unaligned checkpoint does not solve the
problem as MAX_WATERMARK could be persisted in downstream task. When
scenario @Piotr depicted occurs, downstream(or further downstream) window
operator will count all inputs as late.

> If we ensure that the MAX_WATERMARK is only persisted in state if a
recovery will trigger immediately the shut down of this operator, then it
shouldn't be an issue.

You are right in case the assumption holds, I have same thought as you
before. But I am kind of worry about whether it is too prefect to be
fragile. This requires strong guarantee from implementation that a recovery
from TERMINATING stage should go directly to that stage.

> I think the MAX_WATERMARK event should be sent either just before or with
the EndOfPartitionEvent.

I prefer “with the EndOfPartitionEvent”. EndOfPartitionEvent itself already
carry what ending MAX_WATERMARK try to express. May be we can reuse it ?
@Piotr

A preview work[1] from @Yun in PR#14831 explains EndOfPartitionEvent as
checkpoint barrier if there are pending checkpoints.


[1]:
https://github.com/gaoyunhaii/flink/commit/1aaa80fb0afad13a314b06783c61c01b9414f91b#diff-d4568b34060bc589cd1354bd125c35aca993dd0c9fe9e4460dfed73501116459R177


Best,
Kezhu Wang

On February 28, 2021 at 21:23:31, Till Rohrmann (trohrm...@apache.org)
wrote:

I think you are right with the problem of endOfInput. endOfInput should not
be used to commit final results. In fact if this termination fails then we
might end up in a different outcome of the job which is equally valid as
the one before the failure.

Concerning unaligned checkpoints, I think they don't play well together
with draining a streaming pipeline. The problem is that in the draining
case you want to process all records which are still in flight but
unaligned checkpoints don't guarantee this as they can jump in flight
records.

I think the MAX_WATERMARK event should be sent either just before or with
the EndOfPartitionEvent. If we ensure that the MAX_WATERMARK is only
persisted in state if a recovery will trigger immediately the shut down of
this operator, then it shouldn't be an issue.

Cheers,
Till

On Sun, Feb 28, 2021 at 9:06 AM Kezhu Wang  wrote:

> Hi Till,
>
> Just for bookkeeping, some observations from current implementation.
>
> > With this model, the final checkpoint is quite simple because it is
> ingrained in the lifecycle of an operator. Differently said an operator
> will only terminate after it has committed its side effects and seen the
> notifyCheckpointComplete message (if it is stateful).
>
> Currently, we could not mark this operator(or subtask) as terminated since
> result of `notifyCheckpointComplete`(possible side effect committing) is
> not taken into account of the belonging checkpoint. The job has to run to
> next safe point(finished or next checkpoint success) to be marked as
> “terminated”.
>
> > How to implement the 

[jira] [Created] (FLINK-21531) Introduce pluggable Parser

2021-02-28 Thread Rui Li (Jira)
Rui Li created FLINK-21531:
--

 Summary: Introduce pluggable Parser
 Key: FLINK-21531
 URL: https://issues.apache.org/jira/browse/FLINK-21531
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Rui Li
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21530) Precompute TypeName's canonical string representation

2021-02-28 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-21530:
---

 Summary: Precompute TypeName's canonical string representation
 Key: FLINK-21530
 URL: https://issues.apache.org/jira/browse/FLINK-21530
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: statefun-3.0.0


There's room for improvement in StateFun's {{PersistedRemoteFunctionValues}}, 
where we currently concatenate strings to build the typename string for each 
state value we attach to a {{ToFunction}} message.

This extra work can be easily avoided by precomputing the canonical typename 
string, since {{TypeName}}'s are immutable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21529) FLIP-152: Hive Query Syntax Compatibility

2021-02-28 Thread Rui Li (Jira)
Rui Li created FLINK-21529:
--

 Summary: FLIP-152: Hive Query Syntax Compatibility
 Key: FLINK-21529
 URL: https://issues.apache.org/jira/browse/FLINK-21529
 Project: Flink
  Issue Type: New Feature
Reporter: Rui Li






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-28 Thread Kurt Young
I also asked some users about their opinion that if we introduce some
config prefixed with "table" but doesn't
have affection with methods in Table API and SQL. All of them are kind of
shocked by such question, asking
why we would do anything like this.

This kind of reaction actually doesn't surprise me a lot, so I jumped in
and challenged this config option even
after the FLIP had already been accepted.

If we only have to define the execution behavior for multiple statements in
SQL client, we should only introduce
a config option which would tell users it's affection scope by its name.
Prefixing with "table" is definitely not a good
idea here.

Best,
Kurt


On Fri, Feb 26, 2021 at 9:39 PM Leonard Xu  wrote:

> Hi, all
>
> Look like there’s only one divergence about option [ table | sql-client
> ].dml-sync in this thread, correct me if I’m wrong.
>
> 1. Leaving the context of this thread, from a user's perspective,
> the table.xx configurations should take effect in Table API & SQL,
> the sql-client.xx configurations should only take effect in sql-client.
>  In my(the user's) opinion, other explanations are counterintuitive.
>
> 2.  It should be pointed out that both all existed table.xx configurations
> like table.exec.state.ttl, table.optimizer.agg-phase-strategy,
> table.local-time-zone,etc..  and the proposed sql-client.xx configurations
> like sql-client.verbose, sql-client.execution.max-table-result.rows
> comply with this convention.
>
> 3. Considering the portability to support different CLI tools (sql-client,
> sql-gateway, etc.), I prefer table.dml-sync.
>
> In addition, I think sql-client/sql-gateway/other CLI tools can be placed
> out of flink-table module even in an external project, this should not
> affect our conclusion.
>
>
> Hope this can help you.
>
>
> Best,
> Leonard
>
>
>
> > 在 2021年2月25日,18:51,Shengkai Fang  写道:
> >
> > Hi, everyone.
> >
> > I do some summaries about the discussion about the option. If the summary
> > has errors, please correct me.
> >
> > `table.dml-sync`:
> > - take effect for `executeMultiSql` and sql client
> > - benefit: SQL script portability. One script for all platforms.
> > - drawback: Don't work for `TableEnvironment#executeSql`.
> >
> > `table.multi-dml-sync`:
> > - take effect for `executeMultiSql` and sql client
> > - benefit: SQL script portability
> > - drawback: It's confused when the sql script has one dml statement but
> > need to set option `table.multi-dml-sync`
> >
> > `client.dml-sync`:
> > - take effect for sql client only
> > - benefit: clear definition.
> > - drawback: Every platform needs to define its own option. Bad SQL script
> > portability.
> >
> > Just as Jark said, I think the `table.dml-sync` is a good choice if we
> can
> > extend its scope and make this option works for `executeSql`.
> > It's straightforward and users can use this option now in table api.  The
> > drawback is the  `TableResult#await` plays the same role as the option.
> I
> > don't think the drawback is really critical because many systems have
> > commands play the same role with the different names.
> >
> > Best,
> > Shengkai
> >
> > Timo Walther  于2021年2月25日周四 下午4:23写道:
> >
> >> The `table.` prefix is meant to be a general option in the table
> >> ecosystem. Not necessarily attached to Table API or SQL Client. That's
> >> why SQL Client is also located in the `flink-table` module.
> >>
> >> My main concern is the SQL script portability. Declaring the sync/async
> >> behavior will happen in many SQL scripts. And users should be easily
> >> switch from SQL Client to some commercial product without the need of
> >> changing the script again.
> >>
> >> Sure, we can change from `sql-client.dml-sync` to `table.dml-sync` later
> >> but that would mean introducing future confusion. An app name (what
> >> `sql-client` kind of is) should not be part of a config option key if
> >> other apps will need the same kind of option.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 24.02.21 08:59, Jark Wu wrote:
>  From my point of view, I also prefer "sql-client.dml-sync",
> >>> because the behavior of this configuration is very clear.
> >>> Even if we introduce a new config in the future, e.g. `table.dml-sync`,
> >>> we can also deprecate the sql-client one.
> >>>
> >>> Introducing a "table."  configuration without any implementation
> >>> will confuse users a lot, as they expect it should take effect on
> >>> the Table API.
> >>>
> >>> If we want to introduce an unified "table.dml-sync" option, I prefer
> >>> it should be implemented on Table API and affect all the DMLs on
> >>> Table API (`tEnv.executeSql`, `Table.executeInsert`, `StatementSet`),
> >>> as I have mentioned before [1].
> >>>
>  It would be very straightforward that it affects all the DMLs on SQL
> CLI
> >>> and
> >>> TableEnvironment (including `executeSql`, `StatementSet`,
> >>> `Table#executeInsert`, etc.).
> >>> This can also make SQL CLI easy to support this configuration by
> passing
> >>> through 

[jira] [Created] (FLINK-21528) Rest Api Support Wildcard

2021-02-28 Thread wxmimperio (Jira)
wxmimperio created FLINK-21528:
--

 Summary: Rest Api Support Wildcard
 Key: FLINK-21528
 URL: https://issues.apache.org/jira/browse/FLINK-21528
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Affects Versions: 1.11.1
Reporter: wxmimperio
 Attachments: image-2021-03-01-12-00-36-269.png, 
image-2021-03-01-12-05-05-881.png, image-2021-03-01-12-06-15-340.png

!image-2021-03-01-12-00-36-269.png!

To obtain detailed metrics information, you need to specify the complete 
metrics name.

But these names are automatically generated, only by obtaining a list of all 
metrics names, and then filtering by keywords.

For example, the content of the red box is automatically generated without 
knowing in advance.

!image-2021-03-01-12-06-15-340.png!
I can only get all the metrics names, and then filter TableSourceScan and 
join-time-max.

If I can http://xxx/metrics/get=*join-time-max, I can easily filter what i want.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21527) fromValues function cost enormous network buffer

2021-02-28 Thread Spongebob (Jira)
Spongebob created FLINK-21527:
-

 Summary: fromValues function cost enormous network buffer
 Key: FLINK-21527
 URL: https://issues.apache.org/jira/browse/FLINK-21527
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.12.1
Reporter: Spongebob
 Attachments: image-2021-03-01-11-37-25-416.png

When using fromValues to build a Table object, flink will cost enormous network 
buffer, even less than 100k raw data will cost exceed 2048 network memory 
buffer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21526) Replace scheduler benchmarks with wrapper classes

2021-02-28 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21526:


 Summary: Replace scheduler benchmarks with wrapper classes
 Key: FLINK-21526
 URL: https://issues.apache.org/jira/browse/FLINK-21526
 Project: Flink
  Issue Type: Sub-task
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


After moving the implementations of scheduler benchmarks to the Flink 
repository, we can replace scheduler benchmarks in flink-benchmark with wrapper 
classes.

This will make sure the compilation and execution of flink-benchmark will not 
fail because of modifications in interfaces/methods used by scheduler 
benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21525) Move scheduler benchmarks to Flink and add unit tests

2021-02-28 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21525:


 Summary: Move scheduler benchmarks to Flink and add unit tests
 Key: FLINK-21525
 URL: https://issues.apache.org/jira/browse/FLINK-21525
 Project: Flink
  Issue Type: Sub-task
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


To improve the stability of scheduler benchmarks, we'll move the implementation 
of scheduler benchmarks to the Flink repository and add several unit tests. 
When someone modifies the interfaces/methods used by scheduler benchmarks, the 
unit tests will fail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21524) Replace scheduler benchmarks with wrapper classes

2021-02-28 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21524:


 Summary: Replace scheduler benchmarks with wrapper classes
 Key: FLINK-21524
 URL: https://issues.apache.org/jira/browse/FLINK-21524
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


Due to FLINK-21514, we find that when someone modifies interfaces and methods 
used by scheduler benchmarks, the compilation and execution of flink-benchmark 
may break. 

 

To improve the stability of scheduler benchmarks, we decide to replace 
scheduler benchmarks with wrapper/executor classes, and move the current 
implementations to the flink repository. Also we'll implement several unit 
tests based on the implementations. In this way, when someone modifies 
interfaces and methods used by scheduler benchmarks, the unit tests fails, and 
then they must fix the broken benchmarks. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Kezhu Wang
In “stop-with-savepoint —drain”, MAX_WATERMARK is not an issue. For normal
finishing task, not allowing unaligned checkpoint does not solve the
problem as MAX_WATERMARK could be persisted in downstream task. When
scenario @Piotr depicted occurs, downstream(or further downstream) window
operator will count all inputs as late.

> If we ensure that the MAX_WATERMARK is only persisted in state if a
recovery will trigger immediately the shut down of this operator, then it
shouldn't be an issue.

You are right in case the assumption holds, I have same thought as you
before. But I am kind of worry about whether it is too prefect to be
fragile. This requires strong guarantee from implementation that a recovery
from TERMINATING stage should go directly to that stage.

> I think the MAX_WATERMARK event should be sent either just before or with
the EndOfPartitionEvent.

I prefer “with the EndOfPartitionEvent”. EndOfPartitionEvent itself already
carry what ending MAX_WATERMARK try to express. May be we can reuse it ?
@Piotr

A preview work[1] from @Yun in PR#14831 explains EndOfPartitionEvent as
checkpoint barrier if there are pending checkpoints.


[1]:
https://github.com/gaoyunhaii/flink/commit/1aaa80fb0afad13a314b06783c61c01b9414f91b#diff-d4568b34060bc589cd1354bd125c35aca993dd0c9fe9e4460dfed73501116459R177


Best,
Kezhu Wang

On February 28, 2021 at 21:23:31, Till Rohrmann (trohrm...@apache.org)
wrote:

I think you are right with the problem of endOfInput. endOfInput should not
be used to commit final results. In fact if this termination fails then we
might end up in a different outcome of the job which is equally valid as
the one before the failure.

Concerning unaligned checkpoints, I think they don't play well together
with draining a streaming pipeline. The problem is that in the draining
case you want to process all records which are still in flight but
unaligned checkpoints don't guarantee this as they can jump in flight
records.

I think the MAX_WATERMARK event should be sent either just before or with
the EndOfPartitionEvent. If we ensure that the MAX_WATERMARK is only
persisted in state if a recovery will trigger immediately the shut down of
this operator, then it shouldn't be an issue.

Cheers,
Till

On Sun, Feb 28, 2021 at 9:06 AM Kezhu Wang  wrote:

> Hi Till,
>
> Just for bookkeeping, some observations from current implementation.
>
> > With this model, the final checkpoint is quite simple because it is
> ingrained in the lifecycle of an operator. Differently said an operator
> will only terminate after it has committed its side effects and seen the
> notifyCheckpointComplete message (if it is stateful).
>
> Currently, we could not mark this operator(or subtask) as terminated since
> result of `notifyCheckpointComplete`(possible side effect committing) is
> not taken into account of the belonging checkpoint. The job has to run to
> next safe point(finished or next checkpoint success) to be marked as
> “terminated”.
>
> > How to implement the stop-with-savepoint --drain/terminate command with
> this model: One idea could be to tell the sources that they should stop
> reading. This should trigger the EndOfPartitionEvent to be sent
> downstream.
> This will transition all operators into the TERMINATING state.
>
> Currently, EndOfPartitionEvent is only issued after StreamTask.invoke
> returned. To achieve above, possible works should be required:
> * Promote  `EndOfPartitionEvent` from `Task` to `StreamTask`. This may
> have some interferences with BatchTask or network io stack.
> * Or introducing stream task level `EndOfUserRecordsEvent`(from PR#14831
> @Yun @Piotr)
> * Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`.
>
> Besides this, I would like to quote some discussion from FLINK-21467
> between @Piotr and me:
>
> From @Piotr
> > Note, that it's not only that endOfInput can be called multiple times.
> There is a very remote possibility that the following scenario will happen:
> 1. checkpoint is taken (successfully)
> 2. sources are finishing
> 3. endOfInput is issued
> 4. job fails
> 5. job restarts to checkpoint 1.
> 6. after failover, because of some non deterministic logic in the source,
> sources are not finishing
>
> From me
> > But I think there is little work Flink can do to cope with this kind of
> issues. The checkpoint could be a savepoint triggered from user side and
> the "non deterministic logic" could be a change from user(eg. changing of
> stoppingOffsets in KafkaSource).
>
> > I think the "non deterministic logic" could cause trouble in combination
> with unaligned checkpoint and downstream window operator. Unaligned
> checkpoint will persist "MAX_WATERMARK" in state, after restarting,
> "MAX_WATERMARK" will cause downstream window operator ignores all future
> inputs.
>
> FLIP-147 demands no new records from end-of-stream-flushing, but source
> will emit “MAX_WATERMARK” on ending. Previously, I thought it is not a
> valid issue, but turn out that it could cause trouble 

[jira] [Created] (FLINK-21523) ArrayIndexOutOfBoundsException occurs while run a hive streaming source job with partitioned table source

2021-02-28 Thread zouyunhe (Jira)
zouyunhe created FLINK-21523:


 Summary: ArrayIndexOutOfBoundsException occurs while run a hive 
streaming source job with partitioned table source 
 Key: FLINK-21523
 URL: https://issues.apache.org/jira/browse/FLINK-21523
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.12.1
Reporter: zouyunhe


we have two hive table, the ddl as below
{code:java}
//test_tbl5
test.test_5create table test.test_5
 (dpi int, 
  uid bigint) 
partitioned by( day string, hour string) stored as parquet;

//test_tbl3
test.test_3create table test.test_3(
  dpi int,
 uid bigint,
 itime timestamp) stored as parquet;{code}
 then add a partiton to test_tbl5, 
{code:java}
alter table test_tbl5 add partition(day='2021-02-27',hour='12');
{code}
we start a flink streaming job to read hive table test_tbl5 , and write the 
data into test_tbl3, the job's  sql as 
{code:java}
set test_tbl5.streaming-source.enable = true;
insert into hive.test.test_tbl3 select dpi, uid, cast(to_timestamp('2020-08-09 
00:00:00') as timestamp(9)) from hive.test.test_tbl5 where `day` = '2021-02-27';
{code}
and we seen the exception throws
{code:java}
2021-02-28 22:33:16,553 ERROR 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext - 
Exception while handling result from async call in SourceCoordinator-Source: 
HiveSource-test.test_tbl5. Triggering job 
failover.org.apache.flink.connectors.hive.FlinkHiveException: Failed to 
enumerate filesat 
org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator.handleNewSplits(ContinuousHiveSplitEnumerator.java:152)
 ~[flink-connector-hive_2.12-1.12.1.jar:1.12.1]at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:136)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 [flink-dist_2.12-1.12.1.jar:1.12.1]at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[?:1.8.0_60]at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[?:1.8.0_60]at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]Caused by: 
java.lang.ArrayIndexOutOfBoundsException: -1at 
org.apache.flink.connectors.hive.util.HivePartitionUtils.toHiveTablePartition(HivePartitionUtils.java:184)
 ~[flink-connector-hive_2.12-1.12.1.jar:1.12.1]at 
org.apache.flink.connectors.hive.HiveTableSource$HiveContinuousPartitionFetcherContext.toHiveTablePartition(HiveTableSource.java:417)
 ~[flink-connector-hive_2.12-1.12.1.jar:1.12.1]at 
org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator$PartitionMonitor.call(ContinuousHiveSplitEnumerator.java:237)
 ~[flink-connector-hive_2.12-1.12.1.jar:1.12.1]at 
org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator$PartitionMonitor.call(ContinuousHiveSplitEnumerator.java:177)
 ~[flink-connector-hive_2.12-1.12.1.jar:1.12.1]at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:133)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_60]at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_60]  
  at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_60]at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 ~[?:1.8.0_60]... 3 more{code}
it seems the partitoned field is not found in the source table field list.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Release 1.12.2, release candidate #2

2021-02-28 Thread Yuan Mei
Hey Roman,

Thank you very much for preparing RC2.

+1 from my side.

1. Verified Checksums and GPG signatures.
2. Verified that the source archives do not contain any binaries.
3. Successfully Built the source with Maven.
4. Started a local Flink cluster, ran the streaming WordCount example with
WebUI,
checked the output and JM/TM log, no suspicious output/log.
5. Repeat Step 4 with the binary release as well, no suspicious output/log.
6. Checked for source and binary release to make sure both an Apache
License file and a NOTICE file are included.
7. Manually verified that no pom file changes between 1.12.2-rc1 and
1.12.2-rc2; no obvious license problem.
8. Review the release PR for RC2 updates, and double confirmed the
change-list for 1.12.2.

Best,
Yuan

On Sat, Feb 27, 2021 at 7:19 AM Roman Khachatryan  wrote:

> Hi everyone,
>
> Please review and vote on the release candidate #1 for the version 1.12.2,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint 0D545F264D2DFDEBFD4E038F97B4625E2FCF517C [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.12.2-rc2" [5],
> * website pull request listing the new release and adding announcement blog
> post [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12349502=12315522
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.2-rc2/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1414/
> [5] https://github.com/apache/flink/releases/tag/release-1.12.2-rc2
> [6] https://github.com/apache/flink-web/pull/418
>
> Regards,
> Roman
>


Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Till Rohrmann
I think you are right with the problem of endOfInput. endOfInput should not
be used to commit final results. In fact if this termination fails then we
might end up in a different outcome of the job which is equally valid as
the one before the failure.

Concerning unaligned checkpoints, I think they don't play well together
with draining a streaming pipeline. The problem is that in the draining
case you want to process all records which are still in flight but
unaligned checkpoints don't guarantee this as they can jump in flight
records.

I think the MAX_WATERMARK event should be sent either just before or with
the EndOfPartitionEvent. If we ensure that the MAX_WATERMARK is only
persisted in state if a recovery will trigger immediately the shut down of
this operator, then it shouldn't be an issue.

Cheers,
Till

On Sun, Feb 28, 2021 at 9:06 AM Kezhu Wang  wrote:

> Hi Till,
>
> Just for bookkeeping, some observations from current implementation.
>
> > With this model, the final checkpoint is quite simple because it is
> ingrained in the lifecycle of an operator. Differently said an operator
> will only terminate after it has committed its side effects and seen the
> notifyCheckpointComplete message (if it is stateful).
>
> Currently, we could not mark this operator(or subtask) as terminated since
> result of `notifyCheckpointComplete`(possible side effect committing) is
> not taken into account of the belonging checkpoint. The job has to run to
> next safe point(finished or next checkpoint success) to be marked as
> “terminated”.
>
> > How to implement the stop-with-savepoint --drain/terminate command with
> this model: One idea could be to tell the sources that they should stop
> reading. This should trigger the EndOfPartitionEvent to be sent
> downstream.
> This will transition all operators into the TERMINATING state.
>
> Currently, EndOfPartitionEvent is only issued after StreamTask.invoke
> returned. To achieve above, possible works should be required:
> * Promote  `EndOfPartitionEvent` from `Task` to `StreamTask`. This may
> have some interferences with BatchTask or network io stack.
> * Or introducing stream task level `EndOfUserRecordsEvent`(from PR#14831
> @Yun @Piotr)
> * Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`.
>
> Besides this, I would like to quote some discussion from FLINK-21467
> between @Piotr and me:
>
> From @Piotr
> > Note, that it's not only that endOfInput can be called multiple times.
> There is a very remote possibility that the following scenario will happen:
> 1. checkpoint is taken (successfully)
> 2. sources are finishing
> 3. endOfInput is issued
> 4. job fails
> 5. job restarts to checkpoint 1.
> 6. after failover, because of some non deterministic logic in the source,
> sources are not finishing
>
> From me
> > But I think there is little work Flink can do to cope with this kind of
> issues. The checkpoint could be a savepoint triggered from user side and
> the "non deterministic logic" could be a change from user(eg. changing of
> stoppingOffsets in KafkaSource).
>
> > I think the "non deterministic logic" could cause trouble in combination
> with unaligned checkpoint and downstream window operator. Unaligned
> checkpoint will persist "MAX_WATERMARK" in state, after restarting,
> "MAX_WATERMARK" will cause downstream window operator ignores all future
> inputs.
>
> FLIP-147 demands no new records from end-of-stream-flushing, but source
> will emit “MAX_WATERMARK” on ending. Previously, I thought it is not a
> valid issue, but turn out that it could cause trouble under scenario listed
> by @Piotr if I am not wrong.
>
>
> PR#14831: https://github.com/apache/flink/pull/14831
> FLINK-21467: https://issues.apache.org/jira/browse/FLINK-21467
>
>
> Best,
> Kezhu Wang
>
> On February 27, 2021 at 18:12:20, Till Rohrmann (trohrm...@apache.org)
> wrote:
>
> Thanks for all your thoughts. I think we should further think through
> whether to allow checkpoints after an operator has emitted all its records
> (e.g. after close is called currently) or not. I think by doing this we
> would nicely decouple the checkpoint taking from the operator lifecycle
> and
> wouldn't need special checkpoints/savepoints for the final checkpoint and
> stop-with-savepoint --drain. Let me try to explain this a bit more
> detailed.
>
> If we say an operator has the rough lifecycle RUNNING => TERMINATING =>
> TERMINATED where we go from RUNNING into TERMINATING after we have seen
> the
> EndOfPartitionEvent and flushed all our records. The operator goes from
> TERMINATING => TERMINATED if it has persisted all its possible side
> effects. Throughout all states, it is possible to trigger a checkpoint. A
> stateless operator will immediately go from TERMINATING to TERMINATED
> whereas a stateful operator would wait for another checkpoint to be
> triggered and successfully completed (notifyCheckpointComplete).
>
> With this model, the final checkpoint is quite simple because it is
> ingrained in 

[jira] [Created] (FLINK-21522) Iterative stream could not work with stop-with-savepoint

2021-02-28 Thread Kezhu Wang (Jira)
Kezhu Wang created FLINK-21522:
--

 Summary: Iterative stream could not work with stop-with-savepoint
 Key: FLINK-21522
 URL: https://issues.apache.org/jira/browse/FLINK-21522
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.12.1, 1.11.3, 1.13.0
Reporter: Kezhu Wang


User reports this in user mail list: 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-functions-2-2-and-stop-with-savepoint-td41772.html]

I copied the full mail body here:

{quote}
 I have an embedded function with a SinkFunction as an egress, implemented as 
this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to 
use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, 
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain 
", the operation never completes, reporting IN_PROGRESS until I hit the 
"failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: 
Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their 
work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | 
end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | 
end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?
{quote}

I think it is what we overlooked in evaluation of 
[apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033], 
FLINK-21132 and FLINK-21133.

I think the problem is two folds in current implementation:
 # {{StreamIterationHead}} does not finish itself.
 # There is a local feedback from {{StreamIterationTail}} to 
{{StreamIterationHead}} which could cause {{StreamIterationTail}} blocking 
after {{StreamIterationHead}} finished . Globally speaking, it is a loop. 
[~pnowojski] emphasized this in FLINK-21132 and FLINK-21133.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Kezhu Wang
Hi Till,

Just for bookkeeping, some observations from current implementation.

> With this model, the final checkpoint is quite simple because it is
ingrained in the lifecycle of an operator. Differently said an operator
will only terminate after it has committed its side effects and seen the
notifyCheckpointComplete message (if it is stateful).

Currently, we could not mark this operator(or subtask) as terminated since
result of `notifyCheckpointComplete`(possible side effect committing) is
not taken into account of the belonging checkpoint. The job has to run to
next safe point(finished or next checkpoint success) to be marked as
“terminated”.

> How to implement the stop-with-savepoint --drain/terminate command with
this model: One idea could be to tell the sources that they should stop
reading. This should trigger the EndOfPartitionEvent to be sent downstream.
This will transition all operators into the TERMINATING state.

Currently, EndOfPartitionEvent is only issued after StreamTask.invoke
returned. To achieve above, possible works should be required:
* Promote  `EndOfPartitionEvent` from `Task` to `StreamTask`. This may have
some interferences with BatchTask or network io stack.
* Or introducing stream task level `EndOfUserRecordsEvent`(from PR#14831
@Yun @Piotr)
* Or special handling of `CheckpointType.SAVEPOINT_TERMINATE`.

Besides this, I would like to quote some discussion from FLINK-21467
between @Piotr and me:

>From @Piotr
> Note, that it's not only that endOfInput can be called multiple times.
There is a very remote possibility that the following scenario will happen:
1. checkpoint is taken (successfully)
2. sources are finishing
3. endOfInput is issued
4. job fails
5. job restarts to checkpoint 1.
6. after failover, because of some non deterministic logic in the source,
sources are not finishing

>From me
> But I think there is little work Flink can do to cope with this kind of
issues. The checkpoint could be a savepoint triggered from user side and
the "non deterministic logic" could be a change from user(eg. changing of
stoppingOffsets in KafkaSource).

> I think the "non deterministic logic" could cause trouble in combination
with unaligned checkpoint and downstream window operator. Unaligned
checkpoint will persist "MAX_WATERMARK" in state, after restarting,
"MAX_WATERMARK" will cause downstream window operator ignores all future
inputs.

FLIP-147 demands no new records from end-of-stream-flushing, but source
will emit “MAX_WATERMARK” on ending. Previously, I thought it is not a
valid issue, but turn out that it could cause trouble under scenario listed
by @Piotr if I am not wrong.


PR#14831: https://github.com/apache/flink/pull/14831
FLINK-21467: https://issues.apache.org/jira/browse/FLINK-21467


Best,
Kezhu Wang

On February 27, 2021 at 18:12:20, Till Rohrmann (trohrm...@apache.org)
wrote:

Thanks for all your thoughts. I think we should further think through
whether to allow checkpoints after an operator has emitted all its records
(e.g. after close is called currently) or not. I think by doing this we
would nicely decouple the checkpoint taking from the operator lifecycle and
wouldn't need special checkpoints/savepoints for the final checkpoint and
stop-with-savepoint --drain. Let me try to explain this a bit more
detailed.

If we say an operator has the rough lifecycle RUNNING => TERMINATING =>
TERMINATED where we go from RUNNING into TERMINATING after we have seen the
EndOfPartitionEvent and flushed all our records. The operator goes from
TERMINATING => TERMINATED if it has persisted all its possible side
effects. Throughout all states, it is possible to trigger a checkpoint. A
stateless operator will immediately go from TERMINATING to TERMINATED
whereas a stateful operator would wait for another checkpoint to be
triggered and successfully completed (notifyCheckpointComplete).

With this model, the final checkpoint is quite simple because it is
ingrained in the lifecycle of an operator. Differently said an operator
will only terminate after it has committed its side effects and seen the
notifyCheckpointComplete message (if it is stateful). Here it is important
to note that in the streaming case, different bounded operators can
terminate at different times. They don't have to terminate all with the
same checkpoint.

How to implement the stop-with-savepoint --drain/terminate command with
this model: One idea could be to tell the sources that they should stop
reading. This should trigger the EndOfPartitionEvent to be sent downstream.
This will transition all operators into the TERMINATING state. Next the JM
can trigger a checkpoint to shut the operators down and not to wait for the
next automatic checkpoint trigger event.

By allowing checkpoints throughout the entire lifecycle of an operator we
disallow sending records from notifyCheckpointComplete because this message
will also be sent in the state TERMINATING where an operator has already
produced all of its records.

What do