[jira] [Commented] (FLINK-14434) Dispatcher#createJobManagerRunner should returns on creation succeed, not after startJobManagerRunner

2019-10-24 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958638#comment-16958638
 ] 

Zili Chen commented on FLINK-14434:
---

FYI I just notice that {{thenApply}} (and so on which doesn't have a {{Async}} 
suffix) possibly runs in the current thread(instead of the thread executing the 
previous computation) if the previous computation is completed on {{thenApply}} 
called. Thus the analysis above is a bit wrong because we don't ensure the 
starting scheduled to akka-dispatcher but possibly synchronously with 
{{thenApply}} which means in the MainThread.

However, this fix isn't affected by this discovery. Just for further 
information if this difference becomes significant.

> Dispatcher#createJobManagerRunner should returns on creation succeed, not 
> after startJobManagerRunner
> -
>
> Key: FLINK-14434
> URL: https://issues.apache.org/jira/browse/FLINK-14434
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.2, 1.10.0, 1.9.1
>Reporter: Zili Chen
>Assignee: Zili Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.8.3, 1.9.2
>
> Attachments: patch.diff
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In an edge case, let's said
> 1) job finished nearly immediately
> 2) Dispatcher has been suspended in {{#startJobManagerRunner}} after 
> {{jobManagerRunner.start();}} but before {{return jobManagerRunner;}}
> due to
> 1) we put {{jobManagerRunnerFutures}} with {{#startJobManagerRunner}} 
> finished.
> 2) the creation of JobManagerRunner doesn't happen in MainThread.
> it is a possible execution order
> 1) JobManagerRunner created in akka-dispatcher thread
> 2) then apply {{Dispatcher#startJobManagerRunner}}
> 3) until {{jobManagerRunner.start();}} and before {{return jobManagerRunner;}}
> 4) this thread suspended
> 5) job finished, execute callback on MainThread
> 6) {{jobManagerRunnerFutures.get(jobID).getNow(null)}} returns {{null}} 
> because akka-dispatcher thread doesn't {{return jobManagerRunner;}}
> 7) it report {{There is a newer JobManagerRunner for the job}} but actually 
> not.
> **Solution**
> Two perspective but we can even have them both.
> 1. return {{jobManagerRunnerFuture}} in {{#createJobManagerRunner}}, let 
> {{#startJobManagerRunner}} an action
> 2. on JobManagerRunner created, execute {{#startJobManagerRunner}} in 
> MainThread.
> CC [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14434) Dispatcher#createJobManagerRunner should returns on creation succeed, not after startJobManagerRunner

2019-10-21 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16956044#comment-16956044
 ] 

Till Rohrmann commented on FLINK-14434:
---

Thanks for the additional information [~tison]. I always prefer better 
solutions :-) Will take a look at your PR.

> Dispatcher#createJobManagerRunner should returns on creation succeed, not 
> after startJobManagerRunner
> -
>
> Key: FLINK-14434
> URL: https://issues.apache.org/jira/browse/FLINK-14434
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Assignee: Zili Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
> Attachments: patch.diff
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In an edge case, let's said
> 1) job finished nearly immediately
> 2) Dispatcher has been suspended in {{#startJobManagerRunner}} after 
> {{jobManagerRunner.start();}} but before {{return jobManagerRunner;}}
> due to
> 1) we put {{jobManagerRunnerFutures}} with {{#startJobManagerRunner}} 
> finished.
> 2) the creation of JobManagerRunner doesn't happen in MainThread.
> it is a possible execution order
> 1) JobManagerRunner created in akka-dispatcher thread
> 2) then apply {{Dispatcher#startJobManagerRunner}}
> 3) until {{jobManagerRunner.start();}} and before {{return jobManagerRunner;}}
> 4) this thread suspended
> 5) job finished, execute callback on MainThread
> 6) {{jobManagerRunnerFutures.get(jobID).getNow(null)}} returns {{null}} 
> because akka-dispatcher thread doesn't {{return jobManagerRunner;}}
> 7) it report {{There is a newer JobManagerRunner for the job}} but actually 
> not.
> **Solution**
> Two perspective but we can even have them both.
> 1. return {{jobManagerRunnerFuture}} in {{#createJobManagerRunner}}, let 
> {{#startJobManagerRunner}} an action
> 2. on JobManagerRunner created, execute {{#startJobManagerRunner}} in 
> MainThread.
> CC [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14434) Dispatcher#createJobManagerRunner should returns on creation succeed, not after startJobManagerRunner

2019-10-18 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16955058#comment-16955058
 ] 

Zili Chen commented on FLINK-14434:
---

Thanks for your reply [~trohrmann]. I final a possibly better option3 for this 
issue. The diff is tiny and expressive.

https://github.com/TisonKun/flink/commit/baacecb92f11cd367dc89bc48744fea6de94670b

In short, the problem described above is mainly caused by when "job manager 
runner result future" called back, "job manager runner created future" 
conceptually finished but not completed. Revisit the semantic of 
{{#createJobManagerRunner}} we are able to just return a future represent the 
creation and let the caller take care of the start.

Compared with option 2, this approach has a clear semantic and a subtle 
difference that it execute {{JobManagerRunner}} in akka-dispatcher thread, not 
in MainThread. We internally have some issue if starting jm runner happens in 
dispatcher MainThread[1] but it doesn't exist in community codebase.

I'm glad to help with FLINK-11843 and FLINK-11719 on the review side. For this 
issue I'd like to send this tiny patch as a pull request  so that you can 
coordinate patches depending on your schedule.

[1] FYI: It is an interesting case but bias a bit from community codebase. We 
move the job registry totally to Dispatcher so that when job manager runner 
granted leadership it send a RPC to Dispatcher for querying what job scheduling 
status now. Our fork is currently based on 1.7 so that there is a dead-lock 
execution order with solution option 2 above.

1. job manager runner called {{#start}} in Dispatcher MainThread
2. job manager runner leader election service started, and if it is 
standalone(non-ha), it directly calls grantLeadership
3. job manager runner on granted leadership, send a RPC to Dispatcher for 
querying and wait for the result.
4. because {{#start}} occupied the MainThread, the later RPC cannot be 
processed.

We can workaround this case in many ways such as dispatch action a bit, but it 
might infer that if we can schedule an action out of Dispatcher MainThread 
without worry about synchronization provided by single-thread, we'd better to 
do it.

> Dispatcher#createJobManagerRunner should returns on creation succeed, not 
> after startJobManagerRunner
> -
>
> Key: FLINK-14434
> URL: https://issues.apache.org/jira/browse/FLINK-14434
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Assignee: Zili Chen
>Priority: Major
> Fix For: 1.10.0
>
> Attachments: patch.diff
>
>
> In an edge case, let's said
> 1) job finished nearly immediately
> 2) Dispatcher has been suspended in {{#startJobManagerRunner}} after 
> {{jobManagerRunner.start();}} but before {{return jobManagerRunner;}}
> due to
> 1) we put {{jobManagerRunnerFutures}} with {{#startJobManagerRunner}} 
> finished.
> 2) the creation of JobManagerRunner doesn't happen in MainThread.
> it is a possible execution order
> 1) JobManagerRunner created in akka-dispatcher thread
> 2) then apply {{Dispatcher#startJobManagerRunner}}
> 3) until {{jobManagerRunner.start();}} and before {{return jobManagerRunner;}}
> 4) this thread suspended
> 5) job finished, execute callback on MainThread
> 6) {{jobManagerRunnerFutures.get(jobID).getNow(null)}} returns {{null}} 
> because akka-dispatcher thread doesn't {{return jobManagerRunner;}}
> 7) it report {{There is a newer JobManagerRunner for the job}} but actually 
> not.
> **Solution**
> Two perspective but we can even have them both.
> 1. return {{jobManagerRunnerFuture}} in {{#createJobManagerRunner}}, let 
> {{#startJobManagerRunner}} an action
> 2. on JobManagerRunner created, execute {{#startJobManagerRunner}} in 
> MainThread.
> CC [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14434) Dispatcher#createJobManagerRunner should returns on creation succeed, not after startJobManagerRunner

2019-10-18 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16954784#comment-16954784
 ] 

Till Rohrmann commented on FLINK-14434:
---

Thanks for reporting this issue [~tison]. I think you are right with your 
analysis. I would also prefer option two atm. 

In fact, I actually want to rework this part and the {{JobManagerRunner}} a bit 
with FLINK-11719. I hope to get rid of the concurrency introduced by the 
asynchronous creation of the {{JobManagerRunner}} at the cost of a slightly 
changed submission behaviour.

> Dispatcher#createJobManagerRunner should returns on creation succeed, not 
> after startJobManagerRunner
> -
>
> Key: FLINK-14434
> URL: https://issues.apache.org/jira/browse/FLINK-14434
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Assignee: Zili Chen
>Priority: Major
> Fix For: 1.10.0
>
> Attachments: patch.diff
>
>
> In an edge case, let's said
> 1) job finished nearly immediately
> 2) Dispatcher has been suspended in {{#startJobManagerRunner}} after 
> {{jobManagerRunner.start();}} but before {{return jobManagerRunner;}}
> due to
> 1) we put {{jobManagerRunnerFutures}} with {{#startJobManagerRunner}} 
> finished.
> 2) the creation of JobManagerRunner doesn't happen in MainThread.
> it is a possible execution order
> 1) JobManagerRunner created in akka-dispatcher thread
> 2) then apply {{Dispatcher#startJobManagerRunner}}
> 3) until {{jobManagerRunner.start();}} and before {{return jobManagerRunner;}}
> 4) this thread suspended
> 5) job finished, execute callback on MainThread
> 6) {{jobManagerRunnerFutures.get(jobID).getNow(null)}} returns {{null}} 
> because akka-dispatcher thread doesn't {{return jobManagerRunner;}}
> 7) it report {{There is a newer JobManagerRunner for the job}} but actually 
> not.
> **Solution**
> Two perspective but we can even have them both.
> 1. return {{jobManagerRunnerFuture}} in {{#createJobManagerRunner}}, let 
> {{#startJobManagerRunner}} an action
> 2. on JobManagerRunner created, execute {{#startJobManagerRunner}} in 
> MainThread.
> CC [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14434) Dispatcher#createJobManagerRunner should returns on creation succeed, not after startJobManagerRunner

2019-10-17 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16954316#comment-16954316
 ] 

Zili Chen commented on FLINK-14434:
---

Now I would prefer 2 only because with 1 we possibly miss exception in 
JobManagerRunner#start so that user receive submission success but request job 
result with job not found because start failed and the job manager runner 
future removed without a result.

> Dispatcher#createJobManagerRunner should returns on creation succeed, not 
> after startJobManagerRunner
> -
>
> Key: FLINK-14434
> URL: https://issues.apache.org/jira/browse/FLINK-14434
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Assignee: Zili Chen
>Priority: Major
> Fix For: 1.10.0
>
> Attachments: patch.diff
>
>
> In an edge case, let's said
> 1) job finished nearly immediately
> 2) Dispatcher has been suspended in {{#startJobManagerRunner}} after 
> {{jobManagerRunner.start();}} but before {{return jobManagerRunner;}}
> due to
> 1) we put {{jobManagerRunnerFutures}} with {{#startJobManagerRunner}} 
> finished.
> 2) the creation of JobManagerRunner doesn't happen in MainThread.
> it is a possible execution order
> 1) JobManagerRunner created in akka-dispatcher thread
> 2) then apply {{Dispatcher#startJobManagerRunner}}
> 3) until {{jobManagerRunner.start();}} and before {{return jobManagerRunner;}}
> 4) this thread suspended
> 5) job finished, execute callback on MainThread
> 6) {{jobManagerRunnerFutures.get(jobID).getNow(null)}} returns {{null}} 
> because akka-dispatcher thread doesn't {{return jobManagerRunner;}}
> 7) it report {{There is a newer JobManagerRunner for the job}} but actually 
> not.
> **Solution**
> Two perspective but we can even have them both.
> 1. return {{jobManagerRunnerFuture}} in {{#createJobManagerRunner}}, let 
> {{#startJobManagerRunner}} an action
> 2. on JobManagerRunner created, execute {{#startJobManagerRunner}} in 
> MainThread.
> CC [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14434) Dispatcher#createJobManagerRunner should returns on creation succeed, not after startJobManagerRunner

2019-10-17 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16953615#comment-16953615
 ] 

Zili Chen commented on FLINK-14434:
---

With the patch attached it can reproduce this case. You will see the new test 
hang while apply solutions mentioned above it passed.

> Dispatcher#createJobManagerRunner should returns on creation succeed, not 
> after startJobManagerRunner
> -
>
> Key: FLINK-14434
> URL: https://issues.apache.org/jira/browse/FLINK-14434
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Assignee: Zili Chen
>Priority: Major
> Fix For: 1.10.0
>
> Attachments: patch.diff
>
>
> In an edge case, let's said
> 1) job finished nearly immediately
> 2) Dispatcher has been suspended in {{#startJobManagerRunner}} after 
> {{jobManagerRunner.start();}} but before {{return jobManagerRunner;}}
> due to
> 1) we put {{jobManagerRunnerFutures}} with {{#startJobManagerRunner}} 
> finished.
> 2) the creation of JobManagerRunner doesn't happen in MainThread.
> it is a possible execution order
> 1) JobManagerRunner created in akka-dispatcher thread
> 2) then apply {{Dispatcher#startJobManagerRunner}}
> 3) until {{jobManagerRunner.start();}} and before {{return jobManagerRunner;}}
> 4) this thread suspended
> 5) job finished, execute callback on MainThread
> 6) {{jobManagerRunnerFutures.get(jobID).getNow(null)}} returns {{null}} 
> because akka-dispatcher thread doesn't {{return jobManagerRunner;}}
> 7) it report {{There is a newer JobManagerRunner for the job}} but actually 
> not.
> **Solution**
> Two perspective but we can even have them both.
> 1. return {{jobManagerRunnerFuture}} in {{#createJobManagerRunner}}, let 
> {{#startJobManagerRunner}} an action
> 2. on JobManagerRunner created, execute {{#startJobManagerRunner}} in 
> MainThread.
> CC [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)