[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326323#comment-16326323 ] Till Rohrmann commented on FLINK-7949: -- Fixed in 1.4.1 via 7d040fd40c2816e829c81cb38177b6e1579c761c 9014167987cfcd108e7316281d562b7d85c12fba > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus >Assignee: Bartłomiej Tartanus >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322066#comment-16322066 ] ASF GitHub Bot commented on FLINK-7949: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4924 > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus >Assignee: Bartłomiej Tartanus >Priority: Critical > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321849#comment-16321849 ] ASF GitHub Bot commented on FLINK-7949: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4924 Thanks a lot for this fix @bartektartanus. The changes look good to me. Merging your PR :-) > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus >Assignee: Bartłomiej Tartanus >Priority: Critical > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16305562#comment-16305562 ] ASF GitHub Bot commented on FLINK-7949: --- Github user bartektartanus commented on the issue: https://github.com/apache/flink/pull/4924 Hi @tillrohrmann I've finally managed to write a simple test that fails without my change in AsyncWaitOperator class. Test steps: 1. add enough records to fill up the AsyncWaitOperator queue 2. add record which processing takes more time than timeout which causes restart 3. data stream is restarted and in `open()` method it tries to add N+1 recovered stream elements to queue of size N 4. test waits forever and fails due to timeout. Works exactly as I've mentioned before. But if the emitter is started earlier, eventually test passes after two restarts (`timeoutCounter` field in `TimeoutableFunction`). Tried to make this test as short and quick as possible. I don't know if this is the right file for this, please fix this if so. I also hope this change will be in next flink version :) Happy New Year! > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus >Priority: Critical > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238298#comment-16238298 ] ASF GitHub Bot commented on FLINK-7949: --- Github user bartektartanus commented on the issue: https://github.com/apache/flink/pull/4924 Yes, it fixes our issue - now our test in [nussknacker](https://github.com/TouK/nussknacker) is passing. Process restarts seamlessly and works fine even after another restarts, but I haven't managed to reproduce this error in simple unit Flink test (yet). Maybe next week :) > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus >Priority: Major > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238103#comment-16238103 ] ASF GitHub Bot commented on FLINK-7949: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4924 Thanks for the fix @bartektartanus. LGTM. Did you verify that this actually fixes the problem? Would be great if we could also add a unit test to guard against future regressions. > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus >Priority: Major > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225085#comment-16225085 ] ASF GitHub Bot commented on FLINK-7949: --- GitHub user bartektartanus opened a pull request: https://github.com/apache/flink/pull/4924 [FLINK-7949] AsyncWaitOperator is not restarting when queue is full Change: Emitter thread is started BEFORE filling up the queue of recovered elements Issue description: During process restart, if the queue was full (with N elements) and there was pending element waiting to be added to the queue, then the queue couldn't fit N+1 elements and thread was blocked forever. As Till Rohrmann suggested here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html I've changed the order of this code to start emitter thread earlier. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bartektartanus/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4924.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4924 commit 97620649ddfcf8f0320b20bdfdb69d9b44dd8f0c Author: Bartłomiej TartanusDate: 2017-10-30T14:39:43Z start emmiter thread BEFORE filling up the queue of recovered elements > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)