[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836106#comment-17836106 ] Gyula Fora edited comment on FLINK-34704 at 4/11/24 10:41 AM: -- I agree with [~pnowojski] here, the currently blocked element would be lost in the checkpoint. But [~Zakelly] also has a valid point. I have played around with this and there is a simple optimisation to be made for the async operator though under certain circumstances. If the AWOP is the head of the operator chain (no upstream), we could actually checkpoint during yielding but we would also need to checkpoint the current processed element as part of the buffer (temporarily increase the size of the buffer by 1). This is still related to the other ticket in the sense that we need to get the checkpoint trigger during yield but it needs a custom logic for the AWOP to allow checkpointing while being blocked on the full buffer was (Author: gyfora): I agree with [~pnowojski] here, the currently blocked element would be lost in the checkpoint. I have played around with this and there is a simple optimisation to be made for the async operator though under certain circumstances. If the AWOP is the head of the operator chain (no upstream), we could actually checkpoint during yielding but we would also need to checkpoint the current processed element as part of the buffer (temporarily increase the size of the buffer by 1). This is still related to the other ticket in the sense that we need to get the checkpoint trigger during yield but it needs a custom logic for the AWOP to allow checkpointing while being blocked on the full buffer > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836083#comment-17836083 ] Piotr Nowojski edited comment on FLINK-34704 at 4/11/24 9:57 AM: - Yes, I've read the referenced dev mailing list thread and I still don't agree with the conclusion there, that AWOP should allow for a checkpoint to happen while it's {{.yield()}}'ing :) {quote} the checkpoint still happen when some user code is running. {quote} If you are talking about async user code from AWOP, that's not a problem and never has been. When I'm talking about problems with state inconsistency of upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM operator that is CHAINED with AWOP: network input -> OperatorA -> AWOP -> network output OperatorA's state can become corrupted/inconsistent if you allow for checkpoint to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And side note, it's not only a problem of {{AWOP}} but any operator that wants to {{.yield()}}. Operators can only yield to downstream, which also means checkpoints can not be executed, and it affects also firing timers. I have a WIP FLIP-443 that will introduce a neat trick to allow operators to return the execution back to {{StreamTask}} so that checkpoint can happen, but AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to {{.yield()}}, it means it has no more space in the buffer to store any more records. So we can not checkpoint {{AWOP}} until some buffer space will become available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer space becomes available, return from the {{#processElement()}} call, and then it needs to relay on a fix for FLINK-35051 for stream task to prioritise performing a checkpoint over executing more records/mails. was (Author: pnowojski): {quote} the checkpoint still happen when some user code is running. {quote} If you are talking about async user code from AWOP, that's not a problem and never has been. When I'm talking about problems with state inconsistency of upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM operator that is CHAINED with AWOP: network input -> OperatorA -> AWOP -> network output OperatorA's state can become corrupted/inconsistent if you allow for checkpoint to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And side note, it's not only a problem of {{AWOP}} but any operator that wants to {{.yield()}}. Operators can only yield to downstream, which also means checkpoints can not be executed, and it affects also firing timers. I have a WIP FLIP-443 that will introduce a neat trick to allow operators to return the execution back to {{StreamTask}} so that checkpoint can happen, but AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to {{.yield()}}, it means it has no more space in the buffer to store any more records. So we can not checkpoint {{AWOP}} until some buffer space will become available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer space becomes available, return from the {{#processElement()}} call, and then it needs to relay on a fix for FLINK-35051 for stream task to prioritise performing a checkpoint over executing more records/mails. > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835796#comment-17835796 ] Piotr Nowojski edited comment on FLINK-34704 at 4/10/24 3:46 PM: - Adding new {{yield}} method wouldn't help, as we can not perform a checkpoint while operator is yielding, since that can lead to inconsistent state for upstream chained operators. For example when upstream operator does this: {code} OperatorXYZ::processElement(e, output) { stateA = e.foo; output.collect(e); stateB = e.bar; } {code} Downstream chained operator inside {{output.collect(e)}} call, can not allow for checkpoint to be executed, since this would checkpoint the {{OperatorXYZ}} in an inconsistent state. I've discovered this issue independently (but for other operators) and described the problem in FLINK-35051. I think a better solution would be to make {{StreamTask}} prioritise: * processing priority messages from network stack over processing mails * execute out of order "urgent" mails, like time outing aligned checkpoint barriers to unaligned ones or triggerCheckpoint RPCs for source tasks I would propose to close this ticket for just {{AsyncWaitOperator}} in favour of FLINK-35051. was (Author: pnowojski): Adding new {{yield}} method wouldn't help, as we can not perform a checkpoint while operator is yielding, since that can lead to inconsistent state for upstream operators. For example when upstream operator does this: {code} OperatorXYZ::processElement(e, output) { stateA = e.foo; output.collect(e); stateB = e.bar; } {code} Chained operator inside `output.collect(e)` call, can not allow for checkpoint to be executed, since this would checkpoint the {{OperatorXYZ}} in an inconsistent state. I've discovered this issue independently (but for other operators) and described the problem in FLINK-35051. I think a better solution would be to make {{StreamTask}} prioritise: * processing priority messages from network stack over processing mails * execute out of order "urgent" mails, like time outing aligned checkpoint barriers to unaligned ones or triggerCheckpoint RPCs for source tasks I would propose to close this ticket for just {{AsyncWaitOperator}} in favour of FLINK-35051. > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)