[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836106#comment-17836106
 ] 

Gyula Fora edited comment on FLINK-34704 at 4/11/24 10:41 AM:
--

I agree with [~pnowojski] here, the currently blocked element would be lost in 
the checkpoint. But [~Zakelly] also has a valid point.

I have played around with this and there is a simple optimisation to be made 
for the async operator though under certain circumstances.

If the AWOP is the head of the operator chain (no upstream), we could actually 
checkpoint during yielding but we would also need to checkpoint the current 
processed element as part of the buffer (temporarily increase the size of the 
buffer by 1).

This is still related to the other ticket in the sense that we need to get the 
checkpoint trigger during yield but it needs a custom logic for the AWOP to 
allow checkpointing while being blocked on the full buffer


was (Author: gyfora):
I agree with [~pnowojski] here, the currently blocked element would be lost in 
the checkpoint.

I have played around with this and there is a simple optimisation to be made 
for the async operator though under certain circumstances.

If the AWOP is the head of the operator chain (no upstream), we could actually 
checkpoint during yielding but we would also need to checkpoint the current 
processed element as part of the buffer (temporarily increase the size of the 
buffer by 1).

This is still related to the other ticket in the sense that we need to get the 
checkpoint trigger during yield but it needs a custom logic for the AWOP to 
allow checkpointing while being blocked on the full buffer

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836083#comment-17836083
 ] 

Piotr Nowojski edited comment on FLINK-34704 at 4/11/24 9:57 AM:
-

Yes, I've read the referenced dev mailing list thread and I still don't agree 
with the conclusion there, that AWOP should allow for a checkpoint to happen 
while it's {{.yield()}}'ing :)
{quote}
the checkpoint still happen when some user code is running.
{quote}
If you are talking about async user code from AWOP, that's not a problem and 
never has been. When I'm talking about problems with state inconsistency of 
upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM 
operator that is CHAINED with AWOP:

network input -> OperatorA -> AWOP -> network output

OperatorA's state can become corrupted/inconsistent if you allow for checkpoint 
to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the 
yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And 
side note, it's not only a problem of {{AWOP}} but any operator that wants to 
{{.yield()}}. Operators can only yield to downstream, which also means 
checkpoints can not be executed, and it affects also firing timers. 

I have a WIP FLIP-443 that will introduce a neat trick to allow operators to 
return the execution back to {{StreamTask}} so that checkpoint can happen, but 
AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to 
{{.yield()}}, it means it has no more space in the buffer to store any more 
records. So we can not checkpoint {{AWOP}} until some buffer space will become 
available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer 
space becomes available, return from the {{#processElement()}} call, and then 
it needs to relay on a fix for FLINK-35051 for stream task to prioritise 
performing a checkpoint over executing more records/mails. 


was (Author: pnowojski):
{quote}
the checkpoint still happen when some user code is running.
{quote}
If you are talking about async user code from AWOP, that's not a problem and 
never has been. When I'm talking about problems with state inconsistency of 
upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM 
operator that is CHAINED with AWOP:

network input -> OperatorA -> AWOP -> network output

OperatorA's state can become corrupted/inconsistent if you allow for checkpoint 
to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the 
yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And 
side note, it's not only a problem of {{AWOP}} but any operator that wants to 
{{.yield()}}. Operators can only yield to downstream, which also means 
checkpoints can not be executed, and it affects also firing timers. 

I have a WIP FLIP-443 that will introduce a neat trick to allow operators to 
return the execution back to {{StreamTask}} so that checkpoint can happen, but 
AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to 
{{.yield()}}, it means it has no more space in the buffer to store any more 
records. So we can not checkpoint {{AWOP}} until some buffer space will become 
available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer 
space becomes available, return from the {{#processElement()}} call, and then 
it needs to relay on a fix for FLINK-35051 for stream task to prioritise 
performing a checkpoint over executing more records/mails. 

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-10 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835796#comment-17835796
 ] 

Piotr Nowojski edited comment on FLINK-34704 at 4/10/24 3:46 PM:
-

Adding new {{yield}} method wouldn't help, as we can not perform a checkpoint 
while operator is yielding, since that can lead to inconsistent state for 
upstream chained operators. For example when upstream operator does this:

{code}
OperatorXYZ::processElement(e, output) {
  stateA = e.foo;
  output.collect(e);
  stateB = e.bar;
}
{code}
Downstream chained operator inside {{output.collect(e)}} call, can not allow 
for checkpoint to be executed, since this would checkpoint the {{OperatorXYZ}} 
in an inconsistent state.

I've discovered this issue independently (but for other operators) and 
described the problem in FLINK-35051. I think a better solution would be to 
make {{StreamTask}} prioritise:
* processing priority messages from network stack over processing mails
* execute out of order "urgent" mails, like time outing aligned checkpoint 
barriers to unaligned ones or triggerCheckpoint RPCs for source tasks

I would propose to close this ticket for just {{AsyncWaitOperator}} in favour 
of FLINK-35051.


was (Author: pnowojski):
Adding new {{yield}} method wouldn't help, as we can not perform a checkpoint 
while operator is yielding, since that can lead to inconsistent state for 
upstream operators. For example when upstream operator does this:

{code}
OperatorXYZ::processElement(e, output) {
  stateA = e.foo;
  output.collect(e);
  stateB = e.bar;
}
{code}
Chained operator inside `output.collect(e)` call, can not allow for checkpoint 
to be executed, since this would checkpoint the {{OperatorXYZ}} in an 
inconsistent state.

I've discovered this issue independently (but for other operators) and 
described the problem in FLINK-35051. I think a better solution would be to 
make {{StreamTask}} prioritise:
* processing priority messages from network stack over processing mails
* execute out of order "urgent" mails, like time outing aligned checkpoint 
barriers to unaligned ones or triggerCheckpoint RPCs for source tasks

I would propose to close this ticket for just {{AsyncWaitOperator}} in favour 
of FLINK-35051.

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)