[jira] [Commented] (FLINK-23593) Performance regression on 15.07.2021

2021-08-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395983#comment-17395983
 ] 

Zhu Zhu commented on FLINK-23593:
-

I tried the benchmarks locally before/after applying FLINK-23372 and did not 
see obvious regression.
Also tried benchmarks on commit f4afbf3e7de19ebcc5cb9324a22ba99fcd354dce(last 
good on 
[codespeed|http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput&env=2#/?exe=1,3,5&ben=sortedTwoInput&env=2&revs=200&equid=off&quarts=on&extr=on]
 curve)  and eb8100f7afe1cd2b6fceb55b174de097db752fc7(first bad on the curve) 
but did not reproduce the regression either. Maybe it's due to HDD but I have 
no idea yet.






[2]

> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput&env=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput&env=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank from other rank functions [Shuo Cheng]
> 1a195f5cc59 [3 weeks ago] [FLINK-16093][docs-zh] Translate "System Functions" 
> page of "Functions" into Chinese (#16348) [ZhiJie Yang]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time

2021-08-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-16069.
---
Resolution: Duplicate

> Creation of TaskDeploymentDescriptor can block main thread for long time
> 
>
> Key: FLINK-16069
> URL: https://issues.apache.org/jira/browse/FLINK-16069
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: huweihua
>Priority: Major
> Attachments: FLINK-16069-POC-results, batch.png, streaming.png
>
>
> The deploy of tasks will take long time when we submit a high parallelism 
> job. And Execution#deploy run in mainThread, so it will block JobMaster 
> process other akka messages, such as Heartbeat. The creation of 
> TaskDeploymentDescriptor take most of time. We can put the creation in future.
> For example, A job [source(8000)->sink(8000)], the total 16000 tasks from 
> SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of 
> TaskManager timeout and job never success.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time

2021-08-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395728#comment-17395728
 ] 

Zhu Zhu commented on FLINK-16069:
-

Thanks for making the improvements and sharing the results! [~Thesharing]
The attached graph shows the improvement on TaskDeploymentDescriptor creation.
And the table shows the E2E deployment improvement.

> Creation of TaskDeploymentDescriptor can block main thread for long time
> 
>
> Key: FLINK-16069
> URL: https://issues.apache.org/jira/browse/FLINK-16069
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: huweihua
>Priority: Major
> Attachments: FLINK-16069-POC-results, batch.png, streaming.png
>
>
> The deploy of tasks will take long time when we submit a high parallelism 
> job. And Execution#deploy run in mainThread, so it will block JobMaster 
> process other akka messages, such as Heartbeat. The creation of 
> TaskDeploymentDescriptor take most of time. We can put the creation in future.
> For example, A job [source(8000)->sink(8000)], the total 16000 tasks from 
> SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of 
> TaskManager timeout and job never success.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22227) Job name, Job ID and receiving Dispatcher should be logged by the client

2021-08-08 Thread Shen Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395666#comment-17395666
 ] 

Shen Zhu commented on FLINK-7:
--

Hey Chesnay,

Plan to add logging before this line in 
StreamExecutionEnvironment#executeAsync: 
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L2091]
 for job ID(jobClient.getJobID()) and job name(streamGraph.getJobName()).

Do you think it's the correct place? Thanks!

Best Regards,
Shen Zhu 

> Job name, Job ID and receiving Dispatcher should be logged by the client
> 
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: Chesnay Schepler
>Priority: Major
>
> Surprisingly we don't log for job submission where we submit them to or what 
> the job ID/name is.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22217) Add quotes around job names

2021-08-08 Thread Shen Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395665#comment-17395665
 ] 

Shen Zhu commented on FLINK-22217:
--

Hey Chesnay,

Plan to modify this line in JobMaster#startJobExecution: 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L871,]
 do you think it'll work?

Best Regards,
Shen Zhu

> Add quotes around job names
> ---
>
> Key: FLINK-22217
> URL: https://issues.apache.org/jira/browse/FLINK-22217
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Chesnay Schepler
>Priority: Major
>
> Quotes could be neat here:
> {code}
> Starting execution of job State machine job [..]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23627) Migrate all OperatorMG instantiations to factory method

2021-08-07 Thread Shen Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395338#comment-17395338
 ] 

Shen Zhu commented on FLINK-23627:
--

Hey Chesnay,

I'm interested in working on this ticket, and created a PR for it: 
[https://github.com/apache/flink/pull/16750.] Could you please assign it to me?

Thanks for your help.

Best Regards,
Shen Zhu

> Migrate all OperatorMG instantiations to factory method
> ---
>
> Key: FLINK-23627
> URL: https://issues.apache.org/jira/browse/FLINK-23627
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.14.0
>
>
> Modify all existing usages of the OperatorMG constructor to use runtime Apis, 
> for consistency and to make constructor changes easier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23625) Migrate all TaskManagerJobMG instantiations to factory method

2021-08-05 Thread Shen Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394498#comment-17394498
 ] 

Shen Zhu commented on FLINK-23625:
--

Hey Chesnay,

I'm interested in working on this ticket, if possible, could you please assign 
it to me?

Best Regards,
Shen Zhu

> Migrate all TaskManagerJobMG instantiations to factory method
> -
>
> Key: FLINK-23625
> URL: https://issues.apache.org/jira/browse/FLINK-23625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Reporter: Chesnay Schepler
>Priority: Major
>
> Modify all existing usages of the TaskManagerJobMG constructor to use runtime 
> Apis, for consistency and to make constructor changes easier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11634) Translate "State Backends" page into Chinese

2021-08-05 Thread Shen Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394276#comment-17394276
 ] 

Shen Zhu commented on FLINK-11634:
--

Hey [~jark] 

I have created a PR for this ticket: 
[https://github.com/apache/flink/pull/16522,] can I get someone to review this 
PR. Thanks!

Best Regards,
Shen Zhu

> Translate "State Backends" page into Chinese
> 
>
> Key: FLINK-11634
> URL: https://issues.apache.org/jira/browse/FLINK-11634
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu
>Assignee: Shen Zhu
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
>
> doc locates in flink/docs/dev/stream/state/state_backens.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393950#comment-17393950
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/5/21, 12:19 PM:
---

>> Could the larger difference between local benchmark vs. cloud be that the 
>> cloud is running with regular HDDs and we always spill to disk because 
>> SORT_SPILLING_THRESHOLD is set to 0?

Maybe yes. Because the record processing time can be shorter on SSD and the 
increased initialization time(described in *Trying to explain the Regression*) 
will be more obvious.

Another similar suspicion is that the flink-benchmark 
[patch|https://github.com/twalthr/flink-benchmarks/commit/dfe3cad86030b551daaa7c4a5951a6e4c06fc061]
 increased `RECORDS_PER_INVOCATION` from 1_500_000 to 3_000_000. This increased 
processing time and may make the regression on initialization time less obvious.


was (Author: zhuzh):
>> Could the larger difference between local benchmark vs. cloud be that the 
>> cloud is running with regular HDDs and we always spill to disk because 
>> SORT_SPILLING_THRESHOLD is set to 0?

Maybe yes. Because the record processing time can be shorter on SSD and the 
increased initialization time(described in *Trying to explain the Regression*) 
will be more obvious.

Another similar suspicion is that the flink-benchmark 
[patch|https://github.com/twalthr/flink-benchmarks/commit/dfe3cad86030b551daaa7c4a5951a6e4c06fc061]
 increased `RECORDS_PER_INVOCATION` from 1_500_000 to 3_000_000. This increased 
processing time may make the regression on initialization time less obvious.

> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput&env=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput&env=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsI

[jira] [Commented] (FLINK-23593) Performance regression on 15.07.2021

2021-08-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393950#comment-17393950
 ] 

Zhu Zhu commented on FLINK-23593:
-

>> Could the larger difference between local benchmark vs. cloud be that the 
>> cloud is running with regular HDDs and we always spill to disk because 
>> SORT_SPILLING_THRESHOLD is set to 0?

Maybe yes. Because the record processing time can be shorter on SSD and the 
increased initialization time(described in *Trying to explain the Regression*) 
will be more obvious.

Another similar suspicion is that the flink-benchmark 
[patch|https://github.com/twalthr/flink-benchmarks/commit/dfe3cad86030b551daaa7c4a5951a6e4c06fc061]
 increased `RECORDS_PER_INVOCATION` from 1_500_000 to 3_000_000. This increased 
processing time may make the regression on initialization time less obvious.

> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput&env=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput&env=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank from other rank functions [Shuo Cheng]
> 1a195f5cc59 [3 weeks ago] [FLINK-16093][docs-zh] Translate "System Functions" 
> page of "Functions" into Chinese (#16348) [ZhiJie Yang]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23172) Links to Task Failure Recovery page on Configuration page are broken

2021-08-05 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23172:

Affects Version/s: 1.13.2

> Links to Task Failure Recovery page on Configuration page are broken
> 
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> The links to [Task Failure 
> Recovery|https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/]
>  page inside [Fault 
> Tolerance|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance]
>  section and [Advanced Fault Tolerance 
> Options|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options]
>  section on the 
> [Configuration|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  page are broken.
> Let's take an example. In the description of {{restart-strategy}}, currently 
> the link of {{fixed-delay}} refers to 
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy],
>  which doesn't exist and would head to 404 error. The correct link is 
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy].
> The links are located in {{RestartStrategyOptions.java}} and 
> {{JobManagerOptions.java}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23593) Performance regression on 15.07.2021

2021-08-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393836#comment-17393836
 ] 

Zhu Zhu commented on FLINK-23593:
-

Thanks for the updates! [~sewen]

I think your guess about *Trying to explain the Regression* may be right.
However, it looks a bit weird to me that, there is a ~10% regression of 
[sortedTwoInput|http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput], 
but Timo's benchmark shows that enabling slot sharing or not is just results in 
~2% performance differences. So I'm a bit concerned that there may be 
unexpected problems.

I will try these benchmarks locally to see if I can find anything.


> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput&env=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput&env=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank from other rank functions [Shuo Cheng]
> 1a195f5cc59 [3 weeks ago] [FLINK-16093][docs-zh] Translate "System Functions" 
> page of "Functions" into Chinese (#16348) [ZhiJie Yang]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22674) Provide JobID when apply shuffle resource by ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22674:

Fix Version/s: 1.14.0

> Provide JobID when apply shuffle resource by ShuffleMaster
> --
>
> Key: FLINK-22674
> URL: https://issues.apache.org/jira/browse/FLINK-22674
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In current Flink 'pluggable shuffle service' framework, only 
> PartitionDescriptor and ProducerDescriptor are included as parameters in 
> ShuffleMaster#registerPartitionWithProducer.
> But when extending a remote shuffle service based on 'pluggable shuffle 
> service', JobID is also needed when apply shuffle resource from remote 
> cluster. It can be used as an identification to link shuffle resource with 
> the corresponding job:
>  # Remote shuffle cluster can isolate or do capacity control on shuffle 
> resource between jobs;
>  # Remote shuffle cluster can use JobID for shuffle data cleanup when job is 
> lost thus to avoid file leak;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23214) Make ShuffleMaster a cluster level shared service

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23214:

Fix Version/s: 1.14.0

> Make ShuffleMaster a cluster level shared service
> -
>
> Key: FLINK-23214
> URL: https://issues.apache.org/jira/browse/FLINK-23214
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> This ticket tries to make ShuffleMaster a cluster level shared service which 
> makes it consistent with the ShuffleEnvironment at the TM side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23249) Introduce ShuffleMasterContext to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23249.
---
Resolution: Done

Done via 0ee4038ef596b22630bc814f677fa489d3796241

> Introduce ShuffleMasterContext to ShuffleMaster
> ---
>
> Key: FLINK-23249
> URL: https://issues.apache.org/jira/browse/FLINK-23249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Introduce ShuffleMasterContext to ShuffleMaster. Just like the 
> ShuffleEnvironmentContext at the TaskManager side, the ShuffleMasterContext 
> can act as a proxy of ShuffleMaster and other components of Flink like the 
> ResourceManagerPartitionTracker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22675) Add lifecycle methods to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22675:

Fix Version/s: 1.14.0

> Add lifecycle methods to ShuffleMaster
> --
>
> Key: FLINK-22675
> URL: https://issues.apache.org/jira/browse/FLINK-22675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
> Fix For: 1.14.0
>
>
> When extending remote shuffle service based on 'pluggable shuffle service',  
> ShuffleMaster talks with remote cluster by network connection. This Jira 
> proposes to add an interface method –- ShuffleMaster#close, which can be 
> extended and do cleanup work and will be called when Flink application is 
> closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23214) Make ShuffleMaster a cluster level shared service

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23214.
---
Resolution: Done

Done via 81e1db3c439c1758dccd1a20f2f6b70120f48ef7

> Make ShuffleMaster a cluster level shared service
> -
>
> Key: FLINK-23214
> URL: https://issues.apache.org/jira/browse/FLINK-23214
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
>
> This ticket tries to make ShuffleMaster a cluster level shared service which 
> makes it consistent with the ShuffleEnvironment at the TM side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22674) Provide JobID when apply shuffle resource by ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22674.
---
Resolution: Done

Done via 6bc8399e7f1738ec22cb1082c096269b5106cee5

> Provide JobID when apply shuffle resource by ShuffleMaster
> --
>
> Key: FLINK-22674
> URL: https://issues.apache.org/jira/browse/FLINK-22674
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
>
> In current Flink 'pluggable shuffle service' framework, only 
> PartitionDescriptor and ProducerDescriptor are included as parameters in 
> ShuffleMaster#registerPartitionWithProducer.
> But when extending a remote shuffle service based on 'pluggable shuffle 
> service', JobID is also needed when apply shuffle resource from remote 
> cluster. It can be used as an identification to link shuffle resource with 
> the corresponding job:
>  # Remote shuffle cluster can isolate or do capacity control on shuffle 
> resource between jobs;
>  # Remote shuffle cluster can use JobID for shuffle data cleanup when job is 
> lost thus to avoid file leak;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22675) Add lifecycle methods to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22675:
---

Assignee: Yingjie Cao  (was: Zhu Zhu)

> Add lifecycle methods to ShuffleMaster
> --
>
> Key: FLINK-22675
> URL: https://issues.apache.org/jira/browse/FLINK-22675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
>
> When extending remote shuffle service based on 'pluggable shuffle service',  
> ShuffleMaster talks with remote cluster by network connection. This Jira 
> proposes to add an interface method –- ShuffleMaster#close, which can be 
> extended and do cleanup work and will be called when Flink application is 
> closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22675) Add lifecycle methods to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22675.
---
Resolution: Done

Done via 80df36b51af791f67126e07e182015ea6ea73fd2

> Add lifecycle methods to ShuffleMaster
> --
>
> Key: FLINK-22675
> URL: https://issues.apache.org/jira/browse/FLINK-22675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
>
> When extending remote shuffle service based on 'pluggable shuffle service',  
> ShuffleMaster talks with remote cluster by network connection. This Jira 
> proposes to add an interface method –- ShuffleMaster#close, which can be 
> extended and do cleanup work and will be called when Flink application is 
> closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22674) Provide JobID when apply shuffle resource by ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22674:
---

Assignee: Yingjie Cao

> Provide JobID when apply shuffle resource by ShuffleMaster
> --
>
> Key: FLINK-22674
> URL: https://issues.apache.org/jira/browse/FLINK-22674
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
>
> In current Flink 'pluggable shuffle service' framework, only 
> PartitionDescriptor and ProducerDescriptor are included as parameters in 
> ShuffleMaster#registerPartitionWithProducer.
> But when extending a remote shuffle service based on 'pluggable shuffle 
> service', JobID is also needed when apply shuffle resource from remote 
> cluster. It can be used as an identification to link shuffle resource with 
> the corresponding job:
>  # Remote shuffle cluster can isolate or do capacity control on shuffle 
> resource between jobs;
>  # Remote shuffle cluster can use JobID for shuffle data cleanup when job is 
> lost thus to avoid file leak;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22675) Add lifecycle methods to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22675:
---

Assignee: Zhu Zhu

> Add lifecycle methods to ShuffleMaster
> --
>
> Key: FLINK-22675
> URL: https://issues.apache.org/jira/browse/FLINK-22675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>
> When extending remote shuffle service based on 'pluggable shuffle service',  
> ShuffleMaster talks with remote cluster by network connection. This Jira 
> proposes to add an interface method –- ShuffleMaster#close, which can be 
> extended and do cleanup work and will be called when Flink application is 
> closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23214) Make ShuffleMaster a cluster level shared service

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23214:
---

Assignee: Yingjie Cao

> Make ShuffleMaster a cluster level shared service
> -
>
> Key: FLINK-23214
> URL: https://issues.apache.org/jira/browse/FLINK-23214
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
>
> This ticket tries to make ShuffleMaster a cluster level shared service which 
> makes it consistent with the ShuffleEnvironment at the TM side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23249) Introduce ShuffleMasterContext to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23249:
---

Assignee: Yingjie Cao

> Introduce ShuffleMasterContext to ShuffleMaster
> ---
>
> Key: FLINK-23249
> URL: https://issues.apache.org/jira/browse/FLINK-23249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Introduce ShuffleMasterContext to ShuffleMaster. Just like the 
> ShuffleEnvironmentContext at the TaskManager side, the ShuffleMasterContext 
> can act as a proxy of ShuffleMaster and other components of Flink like the 
> ResourceManagerPartitionTracker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22910) Refine ShuffleMaster lifecycle management for pluggable shuffle service framework

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22910:
---

Assignee: Yingjie Cao

> Refine ShuffleMaster lifecycle management for pluggable shuffle service 
> framework
> -
>
> Key: FLINK-22910
> URL: https://issues.apache.org/jira/browse/FLINK-22910
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> The current _ShuffleMaster_ has an unclear lifecycle which is inconsistent 
> with the _ShuffleEnvironment_ at the _TM_ side. Besides, it is hard to 
> Implement some important capabilities for remote shuffle service. For 
> example, 1) release external resources when a job finished; 2) Stop or start 
> tracking some partitions depending on the status of the external service or 
> system.
> We drafted a document[1] which proposed some simple changes to solve these 
> issues. The document is still not wholly completed yet. We will start a 
> discussion once it is finished.
>  
> [1] 
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23590) StreamTaskTest#testProcessWithUnAvailableInput is flaky

2021-08-03 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392737#comment-17392737
 ] 

Zhu Zhu commented on FLINK-23590:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=21450&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7

> StreamTaskTest#testProcessWithUnAvailableInput is flaky
> ---
>
> Key: FLINK-23590
> URL: https://issues.apache.org/jira/browse/FLINK-23590
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0
>Reporter: David Morávek
>Assignee: Anton Kalashnikov
>Priority: Critical
> Fix For: 1.14.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=21218&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb]
>  
> {code:java}
> java.lang.AssertionError: 
> Expected: a value equal to or greater than <22L>
>  but: <217391L> was less than <22L>   at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.junit.Assert.assertThat(Assert.java:964)
>   at org.junit.Assert.assertThat(Assert.java:930)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTest.testProcessWithUnAvailableInput(StreamTaskTest.java:1561)
>   at jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23172) Links to Task Failure Recovery page on Configuration page are broken

2021-08-03 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23172.
---
Resolution: Fixed

Fixed via 
5183b2af9d467708725bd1454a671bc7689159a5
46bf6d68ee97684949ba3ad38dc18ff7c800092a

> Links to Task Failure Recovery page on Configuration page are broken
> 
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> The links to [Task Failure 
> Recovery|https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/]
>  page inside [Fault 
> Tolerance|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance]
>  section and [Advanced Fault Tolerance 
> Options|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options]
>  section on the 
> [Configuration|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  page are broken.
> Let's take an example. In the description of {{restart-strategy}}, currently 
> the link of {{fixed-delay}} refers to 
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy],
>  which doesn't exist and would head to 404 error. The correct link is 
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy].
> The links are located in {{RestartStrategyOptions.java}} and 
> {{JobManagerOptions.java}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22767) Optimize the initialization of LocalInputPreferredSlotSharingStrategy

2021-08-03 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22767:
---

Assignee: Zhilong Hong

> Optimize the initialization of LocalInputPreferredSlotSharingStrategy
> -
>
> Key: FLINK-22767
> URL: https://issues.apache.org/jira/browse/FLINK-22767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
>
> Based on the scheduler benchmark introduced in FLINK-21731, we find that 
> during the initialization of {{LocalInputPreferredSlotSharingStrategy}}, 
> there's a procedure that has O(N^2) complexity: 
> {{ExecutionSlotSharingGroupBuilder#tryFindAvailableProducerExecutionSlotSharingGroupFor}}
>  located in {{LocalInputPreferredSlotSharingStrategy}}.
> The original implementation is: 
> {code:java}
> for all SchedulingExecutionVertex in DefaultScheduler:
>   for all consumed SchedulingResultPartition of the SchedulingExecutionVertex:
> get the result partition's producer vertex and determine the 
> ExecutionSlotSharingGroup where the producer vertex locates is available for 
> current vertex{code}
> This procedure has O(N^2) complexity.
> It's obvious that the result partitions in the same ConsumedPartitionGroup 
> have the same producer vertex. So we can just iterate over the 
> ConsumedPartitionGroups instead of all the consumed partitions. This will 
> decrease the complexity from O(N^2) to O(N).
> The optimization of this procedure will speed up the initialization of 
> DefaultScheduler. It will accelerate the submission of a new job, especially 
> for OLAP jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23599) Remove JobVertex#connectIdInput

2021-08-03 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23599.
---
Resolution: Done

Done via ec9ff1ee5e33529260d6a3adfad4b0b34efde55e

> Remove JobVertex#connectIdInput
> ---
>
> Key: FLINK-23599
> URL: https://issues.apache.org/jira/browse/FLINK-23599
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> {{JobVertex#connectIdInput}} is not used in production anymore. It's only 
> used in the unit tests {{testAttachViaIds}} and 
> {{testCannotConnectMissingId}} located in 
> {{DefaultExecutionGraphConstructionTest}}. However, these two test cases are 
> designed to test this method. Therefore, this method and its test cases can 
> be removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23599) Remove JobVertex#connectIdInput

2021-08-03 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23599:
---

Assignee: Zhilong Hong

> Remove JobVertex#connectIdInput
> ---
>
> Key: FLINK-23599
> URL: https://issues.apache.org/jira/browse/FLINK-23599
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> {{JobVertex#connectIdInput}} is not used in production anymore. It's only 
> used in the unit tests {{testAttachViaIds}} and 
> {{testCannotConnectMissingId}} located in 
> {{DefaultExecutionGraphConstructionTest}}. However, these two test cases are 
> designed to test this method. Therefore, this method and its test cases can 
> be removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23593) Performance regression on 15.07.2021

2021-08-03 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392091#comment-17392091
 ] 

Zhu Zhu commented on FLINK-23593:
-

I'd like to understand why the regression happens due to FLINK-23372 before 
deciding whether it is really a problem.
I do not know much about these two benchmarks. IIUC, the mentioned benchmarks 
are for batch jobs only.
After FLINK-23372, batch job upstream and downstream tasks no longer share 
slots. So maybe the reason is that upstream and downstream tasks are deployed 
to different task managers and cannot send data via local input channels?

[~twalthr] do you think this is the cause? or any other suspicions?


> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput&env=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput&env=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank from other rank functions [Shuo Cheng]
> 1a195f5cc59 [3 weeks ago] [FLINK-16093][docs-zh] Translate "System Functions" 
> page of "Functions" into Chinese (#16348) [ZhiJie Yang]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23354) Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor

2021-07-29 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23354.
---
Resolution: Done

Done via 5c475d41fea3c81557e0d463bed1c94024dd0da5

> Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor
> --
>
> Key: FLINK-23354
> URL: https://issues.apache.org/jira/browse/FLINK-23354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 3 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005. 
> For more details about the part 2 please see FLINK-23218._
> Currently a TaskExecutor uses BlobCache to cache the blobs transported from 
> JobManager. The caches are the local file stored on the TaskExecutor. The 
> blob cache will not be cleaned up until one hour after the related job is 
> finished. In FLINK-23218, we are going to distribute the cached 
> ShuffleDescriptors via blob. When large amount of failovers happen, there 
> will be a lot of cache stored on local disk. The blob cache will occupy large 
> amount of disk space. In extreme cases, the blob would blow up the disk space.
> So we need to add a limit size for the ShuffleDescriptors stored in 
> PermanentBlobCache on TaskExecutor, as described in the comments of 
> FLINK-23218. The main idea is to add a size limit and and delete the blobs in 
> LRU order if the size limit is exceeded. Before a blob item is cached, 
> TaskExecutor will firstly check the overall size of cache. If the overall 
> size exceeds the limit, the blob will be deleted in LRU order until the limit 
> is not exceeded anymore. For the blob cache that is deleted, if it is used 
> afterwards, it will be downloaded from the HA or the blob server again.
> The default value of the size limit for the ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor will be 100 MiB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23172) Links of restart strategy in configuration page is broken

2021-07-28 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17389206#comment-17389206
 ] 

Zhu Zhu commented on FLINK-23172:
-

Thanks for reporting this problem! [~Thesharing]
I have assigned you this ticket.

> Links of restart strategy in configuration page is broken
> -
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> The links in Fault Tolerance section of [the configuration 
> page|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  is broken. Currently the link refers to 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy,
>  which doesn't exist and would head to 404 error. The correct link is 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23172) Links of restart strategy in configuration page is broken

2021-07-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23172:
---

Assignee: Zhilong Hong

> Links of restart strategy in configuration page is broken
> -
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> The links in Fault Tolerance section of [the configuration 
> page|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  is broken. Currently the link refers to 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy,
>  which doesn't exist and would head to 404 error. The correct link is 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22773) Optimize the construction of pipelined regions

2021-07-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22773:
---

Assignee: Zhilong Hong

> Optimize the construction of pipelined regions
> --
>
> Key: FLINK-22773
> URL: https://issues.apache.org/jira/browse/FLINK-22773
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> During the initialization of DefaultExecutionTopology, pipelined regions will 
> be computed for scheduling. Currently the complexity of this procedure is 
> O(N^2):
>  
> {code:java}
> for all vertices in the topology:
>   for all consumed results of the vertex:
> if the consumed result is reconnectable:
>   merge the current region with its producer region
> {code}
> One possible solution is mentioned in FLINK-17330.
> If we can optimize this procedure from O(N^2) to O(N), it will speed up the 
> initialization of SchedulerNG, and accelerate the submission of a new job, 
> especially for OLAP jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22773) Optimize the construction of pipelined regions

2021-07-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22773:

Fix Version/s: 1.14.0

> Optimize the construction of pipelined regions
> --
>
> Key: FLINK-22773
> URL: https://issues.apache.org/jira/browse/FLINK-22773
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> During the initialization of DefaultExecutionTopology, pipelined regions will 
> be computed for scheduling. Currently the complexity of this procedure is 
> O(N^2):
>  
> {code:java}
> for all vertices in the topology:
>   for all consumed results of the vertex:
> if the consumed result is reconnectable:
>   merge the current region with its producer region
> {code}
> One possible solution is mentioned in FLINK-17330.
> If we can optimize this procedure from O(N^2) to O(N), it will speed up the 
> initialization of SchedulerNG, and accelerate the submission of a new job, 
> especially for OLAP jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23354) Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor

2021-07-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23354:
---

Assignee: Zhilong Hong

> Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor
> --
>
> Key: FLINK-23354
> URL: https://issues.apache.org/jira/browse/FLINK-23354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 3 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005. 
> For more details about the part 2 please see FLINK-23218._
> Currently a TaskExecutor uses BlobCache to cache the blobs transported from 
> JobManager. The caches are the local file stored on the TaskExecutor. The 
> blob cache will not be cleaned up until one hour after the related job is 
> finished. In FLINK-23218, we are going to distribute the cached 
> ShuffleDescriptors via blob. When large amount of failovers happen, there 
> will be a lot of cache stored on local disk. The blob cache will occupy large 
> amount of disk space. In extreme cases, the blob would blow up the disk space.
> So we need to add a limit size for the ShuffleDescriptors stored in 
> PermanentBlobCache on TaskExecutor, as described in the comments of 
> FLINK-23218. The main idea is to add a size limit and and delete the blobs in 
> LRU order if the size limit is exceeded. Before a blob item is cached, 
> TaskExecutor will firstly check the overall size of cache. If the overall 
> size exceeds the limit, the blob will be deleted in LRU order until the limit 
> is not exceeded anymore. For the blob cache that is deleted, if it is used 
> afterwards, it will be downloaded from the HA or the blob server again.
> The default value of the size limit for the ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor will be 100 MiB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23402) Expose a consistent GlobalDataExchangeMode

2021-07-28 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1730#comment-1730
 ] 

Zhu Zhu commented on FLINK-23402:
-

+1 for option #2 to rename {{ShuffleMode}} as well as the corresponding config 
option. Because the config option way will always be required considering sql 
jobs(as a replacement of {{table.exec.shuffle-mode}}) and we cannot prevent 
users to set execution mode and shuffle mode with invalid combinations. 

Agreed that {{AUTOMATIC}} does not make too much sense then, and a proper 
default value of the config option would be good enough. So I think we can drop 
it and just keep {{ALL_EXCHANGES_PIPELINED}} and {{ALL_EXCHANGES_BLOCKING}} at 
the moment and use {{ALL_EXCHANGES_BLOCKING}} as the default value. 
Later after we have implemented the proposed global shuffle mode in 
FLINK-23470. We can add it with an explicit name 
({{INTRA_SLOT_EXCHANGES_PIPELINED}} maybe) to the accepted values of the config 
option and make it the default value.

> Expose a consistent GlobalDataExchangeMode
> --
>
> Key: FLINK-23402
> URL: https://issues.apache.org/jira/browse/FLINK-23402
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The Table API makes the {{GlobalDataExchangeMode}} configurable via 
> {{table.exec.shuffle-mode}}.
> In Table API batch mode the StreamGraph is configured with 
> {{ALL_EDGES_BLOCKING}} and in DataStream API batch mode 
> {{FORWARD_EDGES_PIPELINED}}.
> I would vote for unifying the exchange mode of both APIs so that complex SQL 
> pipelines behave identical in {{StreamTableEnvironment}} and 
> {{TableEnvironment}}. Also the feedback a got so far would make 
> {{ALL_EDGES_BLOCKING}} a safer option to run pipelines successfully with 
> limited resources.
> [~lzljs3620320]
> {quote}
> The previous history was like this:
> - The default value is pipeline, and we find that many times due to 
> insufficient resources, the deployment will hang. And the typical use of 
> batch jobs is small resources running large parallelisms, because in batch 
> jobs, the granularity of failover is related to the amount of data processed 
> by a single task. The smaller the amount of data, the faster the fault 
> tolerance. So most of the scenarios are run with small resources and large 
> parallelisms, little by little slowly running.
> - Later, we switched the default value to blocking. We found that the better 
> blocking shuffle implementation would not slow down the running speed much. 
> We tested tpc-ds and it took almost the same time.
> {quote}
> [~dwysakowicz]
> {quote}
> I don't see a problem with changing the default value for DataStream batch 
> mode if you think ALL_EDGES_BLOCKING is the better default option.
> {quote}
> In any case, we should make this configurable for DataStream API users and 
> make the specific Table API option obsolete.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23005) Cache the compressed serialized value of ShuffleDescriptors

2021-07-26 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23005.
---
Resolution: Done

Done via
a3f72f20acd4df1dbdc61e145d1d932f61ca63f8
6812d18c358ce007a5cbcd685f32f59c70b03a49
b0d34f819804269b0d980b6747db765133849fe4

> Cache the compressed serialized value of ShuffleDescriptors
> ---
>
> Key: FLINK-23005
> URL: https://issues.apache.org/jira/browse/FLINK-23005
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
>  
> The optimization contains three parts:
> 1. Cache the compressed serialized value of ShuffleDescriptors
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> 2. Distribute the ShuffleDescriptors via blob server (see FLINK-23218)
> 3. Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor 
> (see FLINK-23354)
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance 
> of our optimization. We choose the streaming job in the experiment because no 
> task will be running until all tasks are deployed. This avoids other 
> disturbing factors. The job contains two vertices: a source and a sink. They 
> are connected with an all-to-all edge.
> The results illustrated below are the time interval between the timestamp of 
> the first task that transitions to _deploying_ and the timestamp of the last 
> task that transitions to _running_:
> ||Parallelism||Before||After ||
> |8000*8000|32.611s|6.480s|
> |16000*16000|128.408s|19.051s|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-26 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23218:
---

Assignee: Zhilong Hong

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy in the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during 
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the 
> partitions related to them are no longer valid. This makes sure the blob 
> server won't be full of cached ShuffleDescriptors, even there's a long 
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
> on the TaskExecutor because of cached ShuffleDescriptors. For more details 
> please see FLINK-23354.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-26 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23218.
---
Resolution: Done

Done via ee7e9c3b87f6533d6f54361fddc71585d6b8ad61

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy in the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during 
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the 
> partitions related to them are no longer valid. This makes sure the blob 
> server won't be full of cached ShuffleDescriptors, even there's a long 
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
> on the TaskExecutor because of cached ShuffleDescriptors. For more details 
> please see FLINK-23354.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23479) IncrementalAggregateJsonPlanTest.testIncrementalAggregateWithSumCountDistinctAndRetraction fail

2021-07-26 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387460#comment-17387460
 ] 

Zhu Zhu commented on FLINK-23479:
-

another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20976&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4

> IncrementalAggregateJsonPlanTest.testIncrementalAggregateWithSumCountDistinctAndRetraction
>  fail
> ---
>
> Key: FLINK-23479
> URL: https://issues.apache.org/jira/browse/FLINK-23479
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Dawid Wysakowicz
>Assignee: godfrey he
>Priority: Blocker
>  Labels: pull-request-available, stale-blocker, test-stability
> Fix For: 1.14.0, 1.13.3
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20864&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=8904
> {code}
> Jul 23 04:21:56 [ERROR] 
> testIncrementalAggregateWithSumCountDistinctAndRetraction(org.apache.flink.table.planner.plan.nodes.exec.stream.IncrementalAggregateJsonPlanTest)
>   Time elapsed: 0.067 s  <<< FAILURE!
> Jul 23 04:21:56 org.junit.ComparisonFailure: 
> expected:<...RyaWJ1dGVzcQB+AAFMAA[tjb21wYXJpc2lvbnQAS0xvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZENvbXBhcmlzaW9uO0wAE2ltcGxlbWVudGF0aW9uQ2xhc3NxAH4AA0wACXN1cGVyVHlwZXQANUxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGU7eHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Vc2VyRGVmaW5lZFR5cGUAAQIAA1oAB2lzRmluYWxMAAtkZXNjcmlwdGlvbnQAEkxqYXZhL2xhbmcvU3RyaW5nO0wAEG9iamVjdElkZW50aWZpZXJ0ADFMb3JnL2FwYWNoZS9mbGluay90YWJsZS9jYXRhbG9nL09iamVjdElkZW50aWZpZXI7eHIAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZQABAgACWgAKaXNOdWxsYWJsZUwACHR5cGVSb290dAA2TG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZVJvb3Q7eHABfnIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZVJvb3QAABIAAHhyAA5qYXZhLmxhbmcuRW51bQAAEgAAeHB0AA9TVFJVQ1RVUkVEX1RZUEUBcHABc3IAJmphdmEudXRpbC5Db2xsZWN0aW9ucyRVbm1vZGlmaWFibGVMaXN0/A8lMbXsjhACAAFMAARsaXN0cQB+AAF4cgAsamF2YS51dGlsLkNvbGxlY3Rpb25zJFVubW9kaWZpYWJsZUNvbGxlY3Rpb24ZQgCAy173HgIAAUwAAWN0ABZMamF2YS91dGlsL0NvbGxlY3Rpb247eHBzcgATamF2YS51dGlsLkFycmF5TGlzdHiB0h2Zx2GdAwABSQAEc2l6ZXhwAXcEAXNyAEdvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZEF0dHJpYnV0ZQABAgADTAALZGVzY3JpcHRpb25xAH4ADEwABG5hbWVxAH4ADEwABHR5cGVxAH4ABHhwcHQAA21hcHNyACxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTWFwVHlwZQABAgACTAAHa2V5VHlwZXEAfgAETAAJdmFsdWVUeXBlcQB+AAR4cQB+AA4BfnEAfgARdAADTUFQc3IALG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5JbnRUeXBlAAECAAB4cQB+AA4AfnEAfgARdAAHSU5URUdFUnNyAC9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuQmlnSW50VHlwZQABAgAAeHEAfgAOAH5xAH4AEXQABkJJR0lOVHhxAH4AGn5yAElvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZENvbXBhcmlzaW9uAAASAAB4cQB+ABJ0AAROT05FcQB+AAdwc3EAfgAZAXcEAXNyAC1vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLktleVZhbHVlRGF0YVR5cGWOJMm4zTygngIAAkwAC2tleURhdGFUeXBldAAnTG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvRGF0YVR5cGU7TAANdmFsdWVEYXRhVHlwZXEAfgAveHEAfgACdnIADWphdmEudXRpbC5NYXAAAHhwcQB+AB9zcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5BdG9taWNEYXRhVHlwZRqIUyn6eiMyAgAAeHEAfgACdnIAEWphdmEubGFuZy5JbnRlZ2VyEuKgpPeBhzgCAAFJAAV2YWx1ZXhyABBqYXZhLmxhbmcuTnVtYmVyhqyVHQuU4IsCAAB4cHEAfgAjc3EAfgAzdnIADmphdmEubGFuZy5Mb25nO4vkkMyPI98CAAFKAAV2YWx1ZXhxAH4ANnEAfgAneAAAFFf9AQEAVG9yZy5hcGFjaGUuZmxpbmsudGFibGUucnVudGltZS50eXBldXRpbHMuUm93RGF0YVNlcmlhbGl6ZXIkUm93RGF0YVNlcmlhbGl6ZXJTbmFwc2hvdAMBrO0ABXNyACxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTWFwVHlwZQABAgACTAAHa2V5VHlwZXQAMkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGU7TAAJdmFsdWVUeXBlcQB+AAF4cgAwb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlAAECAAJaAAppc051bGxhYmxlTAAIdHlwZVJvb3R0ADZMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlUm9vdDt4cAF+cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlUm9vdAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAASAAB4cHQAA01BUHNyACxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuSW50VHlwZQABAgAAeHEAfgACAH5xAH4ABXQAB0lOVEVHRVJzcgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkJpZ0ludFR5cGUAAQIAAHhxAH4AAgB+cQB+AAV0AAZCSUdJTlQAFFf9AQEAVG9yZy5hcGFjaGUuZmxpbmsudGFibGUucnVudGltZS50eXBldXRpbHMuTWFwRGF0YVNlcmlhbGl6ZXIkTWFwRGF0YVNlcmlhbGl6ZXJTbmFwc2hvdAOs7QAFc3IALG9yZy5hcGFjaGUuZmxpbmsudGFibGUudH

[jira] [Commented] (FLINK-23470) Use blocking shuffles but pipeline within a slot for batch mode

2021-07-23 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386107#comment-17386107
 ] 

Zhu Zhu commented on FLINK-23470:
-

Sorry I did not see the discussion in FLINK-23402 until I noticed this ticket.
I agree that a "pipeline within a slot" mode is of good value and is easy for 
user understanding.
Actually {{FORWARD_EDGES_PIPELINED}} was designed for this purpose. It was 
working for table/sql jobs. However, it may not work for DataStream jobs 
because forward upstream and downstream tasks can be set to different slot 
sharing groups.

Therefore, to achieve goal, I think we can modify the handling of 
{{FORWARD_EDGES_PIPELINED}} in 
{{StreamingJobGraphGenerator#determineResultPartitionType()}} a bit to generate 
a PIPELINED edge only if the upstream and downstream tasks are in the same slot 
sharing group. And we may drop {{RESCALE_EDGES_PIPELINED}} and rename 
{{FORWARD_EDGES_PIPELINED}} to {{PIPELINED_WITHIN_SLOT}}/{{AUTOMATIC}}.

> Use blocking shuffles but pipeline within a slot for batch mode
> ---
>
> Key: FLINK-23470
> URL: https://issues.apache.org/jira/browse/FLINK-23470
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Timo Walther
>Priority: Major
>
> As discussed in FLINK-23402, we would like to introduce a good default 
> shuffle mode for batch runtime mode that is a trade-off between all pipelined 
> and all blocking shuffles.
> From the discussion in FLINK-23402:
> For the shuffle modes, I think those three settings are actually sufficient.:
> 1. pipeline all, for batch execution that wants pipelined shuffles. (Still 
> batch recovery, no checkpoints, batch operators)
> 2. batch all, just in case you want to.
> 3. batch shuffles, pipeline within a slot. (DEFAULT)
> This should be the default, and it means we batch whenever a slot has a 
> dependency on another slot.
> A dependency between slots is:
> - any all-to-all connection (keyBy, broadcast, rebalance, random)
> - any pointwise connection (rescale)
> - any forward between different slot sharing groups
> Effectively only FORWARD connections within the same slot sharing group has 
> no dependency on another slot.
> That mode makes a lot of sense as the default, because it guarantees that we 
> can always run the program as long as we have at least one slot. No resource 
> starvation ever. But it retains pipelining where we don't chain operators due 
> to missing chaining logic (but we still slot-share them).
> Compared to this (3) mode, FORWARD_EDGES_PIPELINED and 
> POINTWISE_EDGES_PIPELINED are not well-defined.
> POINTWISE_EDGES_PIPELINED is a gamble, it only works if you have a certain 
> amount of resources, related to the rescale factor. Otherwise the job may 
> fail with resource starvation. Hard to understand and debug for users; not a 
> great option in my opinion.
> FORWARD_EDGES_PIPELINED can also lead to job failure with resource starvation 
> when the forward connection connects different slot sharing groups.
> That's why I would drop those (they make it confusing for users) not reuse 
> the GlobalDataExchangeMode, and rather introduce the option (3) above, which 
> mostly batches the exchanges, except when then they are guaranteed to be in 
> the same slot.
> As a side note: The difference between (3) and (2) should be already 
> relatively small in SQL jobs and become smaller over time, as more and more 
> can be chained together.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-22 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385644#comment-17385644
 ] 

Zhu Zhu commented on FLINK-23218:
-

Thanks for confirming! [~trohrmann]
And thanks for the explanation for the transient blob option. I think you are 
right that we can try re-offload {{JobInformation}}, {{TaskInformation}} and 
{{ShuffleDescriptors}} before deploying a task. It may need some extra efforts 
though to track and de-duplicate blobs on BlobServer. So in the first step we 
will try introducing a {{read()}} API in {{PermanentBlobService}} which might 
be simpler.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy in the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during 
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the 
> partitions related to them are no longer valid. This makes sure the blob 
> server won't be full of cached ShuffleDescriptors, even there's a long 
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
> on the TaskExecutor because of cached ShuffleDescriptors. For more details 
> please see FLINK-23354.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385234#comment-17385234
 ] 

Zhu Zhu commented on FLINK-23218:
-

Just summarize the investigation and discussion, a common blob cache size limit 
may be hard and complicated to implement. And it can be hard for users to 
understand that there is a config to limit blob cache size but it only works 
for some of the permanent blobs.

Therefore, I'm thinking to just limit the size of {{ShuffleDescriptor}} blob 
cache and leave other exiting blobs not affected. In the first version we can 
hard code the size limit to be 100MB. It should be enough for 
{{ShuffleDescriptor}} blob cache, because a {{ShuffleDescriptor}} blob for a 
8000x8000 ALL-to-ALL {{JobEdge}} is just 200KB+ and normally there will not be 
too many such kinds of blobs. In this way, we do not need to expose a new 
config to users. And existing blob usages will not be affected. 

To achieve that, we can add a {{readAndTrack()}} method in 
{{PermanentBlobCache}} and only use it for {{ShuffleDescriptor}} blobs.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> i

[jira] [Comment Edited] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384876#comment-17384876
 ] 

Zhu Zhu edited comment on FLINK-23218 at 7/21/21, 1:02 PM:
---

I took a look at the code and looks to me that transient blobs are not deleted 
once read. {{AbstractTaskManagerFileHandler}} will delete the transient blobs 
of log files but it does not apply to all transient blobs in {{BlobServer}}.
However, I can see that transient blobs have a TTL by design which will start 
ticking once generated and will be refreshed once read. Cached 
{{JobInformation}}, {{TaskInformation}} and {{ShuffleDescriptor}} are possible 
to be used for many times during a long time span which can exceeds the 
TTL.(e.g. One job have been running for days before a failover happens.)  So I 
think transient blobs cannot meet the requirement.


was (Author: zhuzh):
I took a look at the code and looks to me that transient blobs are not deleted 
once read. {{AbstractTaskManagerFileHandler}} will delete the transient blobs 
of log files but it does not apply to all transient blobs in {{BlobServer}}.
However, I can see that transient blobs have a TTL by design which will start 
ticking once generated and will be refreshed once read. Cached 
{{JobInformation}}, {{TaskInformation}} and {{ShuffleDescriptor}} are possible 
to be used for many times during a long time span which can exceeds the TTL. So 
I think current transient blobs cannot meet the requirement.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384876#comment-17384876
 ] 

Zhu Zhu commented on FLINK-23218:
-

I took a look at the code and looks to me that transient blobs are not deleted 
once read. {{AbstractTaskManagerFileHandler}} will delete the transient blobs 
of log files but it does not apply to all transient blobs in {{BlobServer}}.
However, I can see that transient blobs have a TTL by design which will start 
ticking once generated and will be refreshed once read. Cached 
{{JobInformation}}, {{TaskInformation}} and {{ShuffleDescriptor}} are possible 
to be used for many times during a long time span which can exceeds the TTL. So 
I think current transient blobs cannot meet the requirement.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can jus

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384868#comment-17384868
 ] 

Zhu Zhu commented on FLINK-23218:
-

I think the PR mentioned above should be 
https://github.com/apache/flink/pull/16498.



> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance 

[jira] [Closed] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22677.
---
Resolution: Done

Done via
0d099b79fddc5e254884e44f2167c625744079a4
0b28fadccfb6b0d2a85592ced9e98b03a0c2d3bf

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22672) Some enhancements for pluggable shuffle service framework

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22672:
---

Assignee: Jin Xing

> Some enhancements for pluggable shuffle service framework
> -
>
> Key: FLINK-22672
> URL: https://issues.apache.org/jira/browse/FLINK-22672
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
> Fix For: 1.14.0
>
>
> "Pluggable shuffle service" in Flink provides an architecture which are 
> unified for both streaming and batch jobs, allowing user to customize the 
> process of data transfer between shuffle stages according to scenarios.
> There are already a number of implementations of "remote shuffle service" on 
> Spark like [1][2][3]. Remote shuffle enables to shuffle data from/to a remote 
> cluster and achieves benefits like :
>  # The lifecycle of computing resource can be decoupled with shuffle data, 
> once computing task is finished, idle computing nodes can be released with 
> its completed shuffle data accommodated on remote shuffle cluster.
>  # There is no need to reserve disk capacity for shuffle on computing nodes. 
> Remote shuffle cluster serves shuffling request with better scaling ability 
> and alleviates the local disk pressure on computing nodes when data skew.
> Based on "pluggable shuffle service", we build our own "remote shuffle 
> service" on Flink –- Lattice, which targets to provide functionalities and 
> improve performance for batch processing jobs. Basically it works as below:
>  # Lattice cluster works as an independent service for shuffling request;
>  # LatticeShuffleMaster extends ShuffleMaster, works inside JM and talks with 
> remote Lattice cluster for shuffle resource application and shuffle data 
> lifecycle management;
>  # LatticeShuffleEnvironment extends ShuffleEnvironment, works inside TM and 
> provides an environment for shuffling data from/to remote Lattice cluster;
> During the process of building Lattice we find some potential enhancements on 
> "pluggable shuffle service". I will enumerate and create some sub JIRAs under 
> this umbrella
>  
> [1] 
> [https://www.alibabacloud.com/blog/emr-remote-shuffle-service-a-powerful-elastic-tool-of-serverless-spark_597728]
> [2] [https://bestoreo.github.io/post/cosco/cosco/]
> [3] [https://github.com/uber/RemoteShuffleService]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22676.
---
Resolution: Done

Done via 62a342b647fc1eac7f87769be92fda798649d6d4

> The partition tracker should support remote shuffle properly
> 
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22676:

Affects Version/s: (was: 1.4)
   1.14.0

> The partition tracker should support remote shuffle properly
> 
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22676:

Fix Version/s: 1.14.0

> The partition tracker should support remote shuffle properly
> 
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.4
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22676:
---

Assignee: Jin Xing

> The partition tracker should support remote shuffle properly
> 
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22676:

Affects Version/s: 1.4

> The partition tracker should support remote shuffle properly
> 
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.4
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-07-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22017:

Comment: was deleted

(was: I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I 
help the community manage its development. I see this issues has been marked as 
Critical but is unassigned and neither itself nor its Sub-Tasks have been 
updated for 7 days. I have gone ahead and marked it "stale-critical". If this 
ticket is critical, please either assign yourself or give an update. 
Afterwards, please remove the label or in 7 days the issue will be 
deprioritized.
)

> Regions may never be scheduled when there are cross-region blocking edges
> -
>
> Key: FLINK-22017
> URL: https://issues.apache.org/jira/browse/FLINK-22017
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration.jpg
>
>
> For the topology with cross-region blocking edges, there are regions that may 
> never be scheduled. The case is illustrated in the figure below.
> !Illustration.jpg!
> Let's denote the vertices with layer_number. It's clear that the edge 
> connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no 
> blocking edges connected to other regions, it will be scheduled first. When 
> vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger 
> {{onExecutionStateChange}} for it.
> As expected, region 2 will be scheduled since all its consumed partitions are 
> consumable. But in fact region 2 won't be scheduled, because the result 
> partition of vertex2_2 is not tagged as consumable. Whether it is consumable 
> or not is determined by its IntermediateDataSet.
> However, an IntermediateDataSet is consumable if and only if all the 
> producers of its IntermediateResultPartitions are finished. This 
> IntermediateDataSet will never be consumable since vertex2_3 is not 
> scheduled. All in all, this forms a deadlock that a region will never be 
> scheduled because it's not scheduled.
> As a solution we should let BLOCKING result partitions be consumable 
> individually. Note that this will result in the scheduling to become 
> execution-vertex-wise instead of stage-wise, with a nice side effect towards 
> better resource utilization. The PipelinedRegionSchedulingStrategy can be 
> simplified along with change to get rid of the correlatedResultPartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-07-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22017:

Comment: was deleted

(was: This critical issue is unassigned and itself and all of its Sub-Tasks 
have not been updated for 7 days. So, it has been labeled "stale-critical". If 
this ticket is indeed critical, please either assign yourself or give an 
update. Afterwards, please remove the label. In 7 days the issue will be 
deprioritized.)

> Regions may never be scheduled when there are cross-region blocking edges
> -
>
> Key: FLINK-22017
> URL: https://issues.apache.org/jira/browse/FLINK-22017
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration.jpg
>
>
> For the topology with cross-region blocking edges, there are regions that may 
> never be scheduled. The case is illustrated in the figure below.
> !Illustration.jpg!
> Let's denote the vertices with layer_number. It's clear that the edge 
> connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no 
> blocking edges connected to other regions, it will be scheduled first. When 
> vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger 
> {{onExecutionStateChange}} for it.
> As expected, region 2 will be scheduled since all its consumed partitions are 
> consumable. But in fact region 2 won't be scheduled, because the result 
> partition of vertex2_2 is not tagged as consumable. Whether it is consumable 
> or not is determined by its IntermediateDataSet.
> However, an IntermediateDataSet is consumable if and only if all the 
> producers of its IntermediateResultPartitions are finished. This 
> IntermediateDataSet will never be consumable since vertex2_3 is not 
> scheduled. All in all, this forms a deadlock that a region will never be 
> scheduled because it's not scheduled.
> As a solution we should let BLOCKING result partitions be consumable 
> individually. Note that this will result in the scheduling to become 
> execution-vertex-wise instead of stage-wise, with a nice side effect towards 
> better resource utilization. The PipelinedRegionSchedulingStrategy can be 
> simplified along with change to get rid of the correlatedResultPartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-07-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22017:

Comment: was deleted

(was: I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I 
help the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.
)

> Regions may never be scheduled when there are cross-region blocking edges
> -
>
> Key: FLINK-22017
> URL: https://issues.apache.org/jira/browse/FLINK-22017
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration.jpg
>
>
> For the topology with cross-region blocking edges, there are regions that may 
> never be scheduled. The case is illustrated in the figure below.
> !Illustration.jpg!
> Let's denote the vertices with layer_number. It's clear that the edge 
> connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no 
> blocking edges connected to other regions, it will be scheduled first. When 
> vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger 
> {{onExecutionStateChange}} for it.
> As expected, region 2 will be scheduled since all its consumed partitions are 
> consumable. But in fact region 2 won't be scheduled, because the result 
> partition of vertex2_2 is not tagged as consumable. Whether it is consumable 
> or not is determined by its IntermediateDataSet.
> However, an IntermediateDataSet is consumable if and only if all the 
> producers of its IntermediateResultPartitions are finished. This 
> IntermediateDataSet will never be consumable since vertex2_3 is not 
> scheduled. All in all, this forms a deadlock that a region will never be 
> scheduled because it's not scheduled.
> As a solution we should let BLOCKING result partitions be consumable 
> individually. Note that this will result in the scheduling to become 
> execution-vertex-wise instead of stage-wise, with a nice side effect towards 
> better resource utilization. The PipelinedRegionSchedulingStrategy can be 
> simplified along with change to get rid of the correlatedResultPartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-07-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22017:

Comment: was deleted

(was: This issue was labeled "stale-critical" 7 ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Critical, 
please raise the priority and ask a committer to assign you the issue or revive 
the public discussion.
)

> Regions may never be scheduled when there are cross-region blocking edges
> -
>
> Key: FLINK-22017
> URL: https://issues.apache.org/jira/browse/FLINK-22017
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration.jpg
>
>
> For the topology with cross-region blocking edges, there are regions that may 
> never be scheduled. The case is illustrated in the figure below.
> !Illustration.jpg!
> Let's denote the vertices with layer_number. It's clear that the edge 
> connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no 
> blocking edges connected to other regions, it will be scheduled first. When 
> vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger 
> {{onExecutionStateChange}} for it.
> As expected, region 2 will be scheduled since all its consumed partitions are 
> consumable. But in fact region 2 won't be scheduled, because the result 
> partition of vertex2_2 is not tagged as consumable. Whether it is consumable 
> or not is determined by its IntermediateDataSet.
> However, an IntermediateDataSet is consumable if and only if all the 
> producers of its IntermediateResultPartitions are finished. This 
> IntermediateDataSet will never be consumable since vertex2_3 is not 
> scheduled. All in all, this forms a deadlock that a region will never be 
> scheduled because it's not scheduled.
> As a solution we should let BLOCKING result partitions be consumable 
> individually. Note that this will result in the scheduling to become 
> execution-vertex-wise instead of stage-wise, with a nice side effect towards 
> better resource utilization. The PipelinedRegionSchedulingStrategy can be 
> simplified along with change to get rid of the correlatedResultPartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-07-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22017.
---
  Assignee: Zhilong Hong
Resolution: Fixed

Fixed via 
d2005268b1eeb0fe928b69c5e56ca54862fbf508
eb8100f7afe1cd2b6fceb55b174de097db752fc7

> Regions may never be scheduled when there are cross-region blocking edges
> -
>
> Key: FLINK-22017
> URL: https://issues.apache.org/jira/browse/FLINK-22017
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration.jpg
>
>
> For the topology with cross-region blocking edges, there are regions that may 
> never be scheduled. The case is illustrated in the figure below.
> !Illustration.jpg!
> Let's denote the vertices with layer_number. It's clear that the edge 
> connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no 
> blocking edges connected to other regions, it will be scheduled first. When 
> vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger 
> {{onExecutionStateChange}} for it.
> As expected, region 2 will be scheduled since all its consumed partitions are 
> consumable. But in fact region 2 won't be scheduled, because the result 
> partition of vertex2_2 is not tagged as consumable. Whether it is consumable 
> or not is determined by its IntermediateDataSet.
> However, an IntermediateDataSet is consumable if and only if all the 
> producers of its IntermediateResultPartitions are finished. This 
> IntermediateDataSet will never be consumable since vertex2_3 is not 
> scheduled. All in all, this forms a deadlock that a region will never be 
> scheduled because it's not scheduled.
> As a solution we should let BLOCKING result partitions be consumable 
> individually. Note that this will result in the scheduling to become 
> execution-vertex-wise instead of stage-wise, with a nice side effect towards 
> better resource utilization. The PipelinedRegionSchedulingStrategy can be 
> simplified along with change to get rid of the correlatedResultPartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11634) Translate "State Backends" page into Chinese

2021-07-13 Thread Shen Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380275#comment-17380275
 ] 

Shen Zhu commented on FLINK-11634:
--

Hey [~jark], 

In the latest version of flink, seems "state_backends.md" has been moved to 
"https://github.com/apache/flink/blob/master/docs/content/docs/ops/state/state_backends.md";,
 and some parts of this page has not been translated to Chinese yet. Can I work 
on this ticket?

Thanks!

> Translate "State Backends" page into Chinese
> 
>
> Key: FLINK-11634
> URL: https://issues.apache.org/jira/browse/FLINK-11634
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu
>Priority: Major
>  Labels: auto-unassigned
>
> doc locates in flink/docs/dev/stream/state/state_backens.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11627) Translate the "JobManager High Availability (HA)" page into Chinese

2021-07-12 Thread Shen Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379579#comment-17379579
 ] 

Shen Zhu commented on FLINK-11627:
--

Hey [~jark] , in the latest version, 
[https://github.com/apache/flink/blob/master/docs/ops/jobmanager_high_availability.md]
 has been replaced by 
[https://github.com/apache/flink/blob/master/docs/content/docs/deployment/ha/overview.md,]
 and there's already a page for Chinese translation: 
[https://github.com/apache/flink/blob/master/docs/content.zh/docs/deployment/ha/overview.md],
 perhaps this ticket could be closed.

> Translate the "JobManager High Availability (HA)" page into Chinese
> ---
>
> Key: FLINK-11627
> URL: https://issues.apache.org/jira/browse/FLINK-11627
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Hui Zhao
>Assignee: Shen Zhu
>Priority: Major
>  Labels: auto-unassigned
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The page url 
> ishttps://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html
> The markdown file is located in 
> https://github.com/apache/flink/blob/master/docs/ops/jobmanager_high_availability.md
> The markdown file will be created once FLINK-11529 is merged.
> You can reference the translation from : 
> https://github.com/flink-china/1.6.0/blob/master/ops/jobmanager_high_availability.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11627) Translate the "JobManager High Availability (HA)" page into Chinese

2021-07-10 Thread Shen Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17378554#comment-17378554
 ] 

Shen Zhu commented on FLINK-11627:
--

Hey [~jark], could you please assign this ticket to me? I can work on it

> Translate the "JobManager High Availability (HA)" page into Chinese
> ---
>
> Key: FLINK-11627
> URL: https://issues.apache.org/jira/browse/FLINK-11627
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Hui Zhao
>Priority: Major
>  Labels: auto-unassigned
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The page url 
> ishttps://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html
> The markdown file is located in 
> https://github.com/apache/flink/blob/master/docs/ops/jobmanager_high_availability.md
> The markdown file will be created once FLINK-11529 is merged.
> You can reference the translation from : 
> https://github.com/flink-china/1.6.0/blob/master/ops/jobmanager_high_availability.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377963#comment-17377963
 ] 

Zhu Zhu commented on FLINK-23218:
-

I took another think and 10GB sounds good to me now. If we always want users to 
limit the size when configuration "blob.offload.minsize" for large scale jobs.
Adding a limit by default is always better than no limit.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Compar

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377961#comment-17377961
 ] 

Zhu Zhu commented on FLINK-23218:
-

10GB looks a bit too large to limit the blob size by default.
If we want the limit to be very large by default, maybe we can make it 
unlimited by default? Users who is changing configuration 
"blob.offload.minsize" for large scale jobs should be aware of the residual 
issue and set the size limit.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a cert

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377811#comment-17377811
 ] 

Zhu Zhu commented on FLINK-23218:
-

1. To not affect existing users, I prefer limit to not be too small otherwise 
the deployment performance may be affected and more loads download can result 
in heavier master/dfs loads. 1GB sound good to me. Note that for most users, 
jobs are low scale and the {{ShuffleDescriptors}} cache can be very small and 
will not be shipped via blobs. So that a large limit will not cause new issues 
(compared that currently there is no limitation). 
2. For low scale jobs, the {{ShuffleDescriptors}} cache will not be shipped via 
blobs, so residue problems will not be worse. Even for large scale jobs, IIRC, 
the compressed {{ShuffleDescriptors}} cache of a 8000x8000 shuffle is 200k+ 
bytes which still does not exceed the 1MB blob offloading threshold. Therefore 
I think we can document for configuration "blob.offload.minsize" to notify 
users to be aware of the residuals and blob size limit.




> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> 

[jira] [Closed] (FLINK-15031) Automatically calculate required network memory for fine-grained jobs

2021-07-06 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-15031.
---
Fix Version/s: (was: 1.12.0)
   1.14.0
   Resolution: Fixed

Done via 
2c52816540e0d5b7c1b3ccdfa35cadc2d026e25b
e32f0f82164512f632a533fad01ead6a12ac8152

> Automatically calculate required network memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23262) FileReadingWatermarkITCase.testWatermarkEmissionWithChaining fails on azure

2021-07-06 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375376#comment-17375376
 ] 

Zhu Zhu commented on FLINK-23262:
-

another instance:
https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/19956/logs/104

> FileReadingWatermarkITCase.testWatermarkEmissionWithChaining fails on azure
> ---
>
> Key: FLINK-23262
> URL: https://issues.apache.org/jira/browse/FLINK-23262
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.14.0
>Reporter: Xintong Song
>Priority: Major
>  Labels: test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19942&view=logs&j=219e462f-e75e-506c-3671-5017d866ccf6&t=4c5dc768-5c82-5ab0-660d-086cb90b76a0&l=5584
> {code}
> Jul 05 22:19:00 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 4.334 s <<< FAILURE! - in 
> org.apache.flink.test.streaming.api.FileReadingWatermarkITCase
> Jul 05 22:19:00 [ERROR] 
> testWatermarkEmissionWithChaining(org.apache.flink.test.streaming.api.FileReadingWatermarkITCase)
>   Time elapsed: 4.16 s  <<< FAILURE!
> Jul 05 22:19:00 java.lang.AssertionError: too few watermarks emitted: 4
> Jul 05 22:19:00   at org.junit.Assert.fail(Assert.java:89)
> Jul 05 22:19:00   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 05 22:19:00   at 
> org.apache.flink.test.streaming.api.FileReadingWatermarkITCase.testWatermarkEmissionWithChaining(FileReadingWatermarkITCase.java:65)
> Jul 05 22:19:00   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jul 05 22:19:00   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jul 05 22:19:00   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jul 05 22:19:00   at java.lang.reflect.Method.invoke(Method.java:498)
> Jul 05 22:19:00   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jul 05 22:19:00   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jul 05 22:19:00   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jul 05 22:19:00   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jul 05 22:19:00   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Jul 05 22:19:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Jul 05 22:19:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375218#comment-17375218
 ] 

Zhu Zhu commented on FLINK-22677:
-

One thing need to mention is that I did not change {{AdaptiveScheduler}} to 
support async partition registration, because otherwise we will need to 
introduce async callback process to {{AdaptiveScheduler}}, which was 
intentionally avoided. Given that customized {{ShuffleMaster}} is mainly for 
batch jobs on external shuffle purpose and reactive mode is for streaming jobs 
only, I think for reactive mode we can keep the assumption that the 
{{ShuffleMaster}} will complete partition registration immediately. 
WDYT? [~trohrmann]

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375217#comment-17375217
 ] 

Zhu Zhu commented on FLINK-22677:
-

Problems below could happen if enabling partition registration is async:
1. task is deployed before its produced partitions are registered. this can 
result in a bad {{ResultPartitionDeploymentDescriptor}}
2. consumer task is deployed before its consumed partitions are registered, if 
the consumer and producer are in the same region
3. partitions can be leaked in {{JobMasterPartitionTracker}} if the producer 
task is failed/canceled before registration is done
4. partition can be leaked in the shuffle service if the producer task is 
failed/canceled before registration is done

To solve this problem, I'd propose to:
1. DefaultScheduler deploys a task only after all its partitions have completed 
registration. This solve problem #1 and #2. #2 can be solved because we now 
always deploy in topologically order, which means when a task is deployed, all 
its upstream tasks are deployed and their partitions will be registered already.
2. Check the producer state on partition registration completion. If the 
producer is not {{SCHEDULED}}, directly ask the {{ShuffleMaster}} to release 
the partition and do not track it

A PR is opened as proposed above.

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-02 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22677:
---

Assignee: Zhu Zhu

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-02 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22677:

Affects Version/s: 1.14.0

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-02 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22677:

Fix Version/s: 1.14.0

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.14.0
>
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-30 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22945.
---
Resolution: Fixed

Fixed via:
master: 5badc356abdcbb3d5cae1fe3f00f1ec18f414d98
1.13: 2d229fc6521b4fc924a4a66347d71b72a1455f77


> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Assignee: Gen Luo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
>  ~[flink-dist_2.11-1.13-vvr-4

[jira] [Updated] (FLINK-23172) Links of restart strategy in configuration page is broken

2021-06-29 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23172:

Priority: Major  (was: Minor)

> Links of restart strategy in configuration page is broken
> -
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> The links in Fault Tolerance section of [the configuration 
> page|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  is broken. Currently the link refers to 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy,
>  which doesn't exist and would head to 404 error. The correct link is 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23172) Links of restart strategy in configuration page is broken

2021-06-29 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23172:

Issue Type: Bug  (was: Technical Debt)

> Links of restart strategy in configuration page is broken
> -
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Priority: Minor
> Fix For: 1.14.0
>
>
> The links in Fault Tolerance section of [the configuration 
> page|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  is broken. Currently the link refers to 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy,
>  which doesn't exist and would head to 404 error. The correct link is 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23078) Scheduler Benchmarks not compiling

2021-06-29 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23078.
---
Resolution: Fixed

Fixed via
flink:
439dbfa48122df164780f55da2cb05f64669a247
49fafcaa62cd34a468b6efc14bf49c53f860d7ce

flink-benchmarks:
e44f22bfa314c08b5a15cea932b84a848a6975ec
1331a9d73255b277d3c37bf9f222bfd0c968393b

> Scheduler Benchmarks not compiling
> --
>
> Key: FLINK-23078
> URL: https://issues.apache.org/jira/browse/FLINK-23078
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks, Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Zhilong Hong
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> {code:java}
> 07:46:50  [ERROR] 
> /home/jenkins/workspace/flink-master-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/scheduler/benchmark/SchedulerBenchmarkBase.java:21:44:
>   error: cannot find symbol
> {code}
> CC [~chesnay] [~Thesharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15031) Automatically calculate required network memory for fine-grained jobs

2021-06-29 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371290#comment-17371290
 ] 

Zhu Zhu edited comment on FLINK-15031 at 6/29/21, 10:52 AM:


Discussed with [~trohrmann] offline. His concern was that the network 
configuration can be different in different TMs and this inconsistency can 
cause different announced and used network memory. To solve this problem, we 
think in the first version we should clearly state in documents that it 
requires JM/RM/TMs to have the same configuration.

Another thing we should be aware is the network memory usage code may change 
and result int requirement&usage inconsistency. To avoid this problem, we 
should extract current network memory usage code block to let it be shared by 
the requirement calculation and usage.

Regarding the fraction style config, it is actually refers to floating buffers 
excluding minimum requirements. It is a bit too hard for users(even advanced 
users) to understand. Given that it is a new feature to automatically calculate 
required network memory, we think it's fine that in the first version, to avoid 
performance regression and buffer request timeout, always include enough extra 
floating buffers into the requirement, and see whether users complain about 
network memory requirements. 

WDYT? [~jinxing6...@126.com]  [~karmagyz]  [~xintongsong]


was (Author: zhuzh):
Discussed with Till offline. His concern was that the network configuration can 
be different in different TMs and this inconsistency can cause different 
announced and used network memory. To solve this problem, we think in the first 
version we should clearly state in documents that it requires JM/RM/TMs to have 
the same configuration.

Another thing we should be aware is the network memory usage code may change 
and result int requirement&usage inconsistency. To avoid this problem, we 
should extract current network memory usage code block to let it be shared by 
the requirement calculation and usage.

Regarding the fraction style config, it is actually refers to floating buffers 
excluding minimum requirements. It is a bit too hard for users(even advanced 
users) to understand. Given that it is a new feature to automatically calculate 
required network memory, we think it's fine that in the first version, to avoid 
performance regression and buffer request timeout, always include enough extra 
floating buffers into the requirement, and see whether users complain about 
network memory requirements. 

WDYT? [~jinxing6...@126.com][~karmagyz][~xintongsong]

> Automatically calculate required network memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVe

[jira] [Commented] (FLINK-15031) Automatically calculate required network memory for fine-grained jobs

2021-06-29 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371290#comment-17371290
 ] 

Zhu Zhu commented on FLINK-15031:
-

Discussed with Till offline. His concern was that the network configuration can 
be different in different TMs and this inconsistency can cause different 
announced and used network memory. To solve this problem, we think in the first 
version we should clearly state in documents that it requires JM/RM/TMs to have 
the same configuration.

Another thing we should be aware is the network memory usage code may change 
and result int requirement&usage inconsistency. To avoid this problem, we 
should extract current network memory usage code block to let it be shared by 
the requirement calculation and usage.

Regarding the fraction style config, it is actually refers to floating buffers 
excluding minimum requirements. It is a bit too hard for users(even advanced 
users) to understand. Given that it is a new feature to automatically calculate 
required network memory, we think it's fine that in the first version, to avoid 
performance regression and buffer request timeout, always include enough extra 
floating buffers into the requirement, and see whether users complain about 
network memory requirements. 

WDYT? [~jinxing6...@126.com][~karmagyz][~xintongsong]

> Automatically calculate required network memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15031) Automatically calculate required network memory for fine-grained jobs

2021-06-29 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371207#comment-17371207
 ] 

Zhu Zhu edited comment on FLINK-15031 at 6/29/21, 8:18 AM:
---

I think it should be an advanced config and even experimental in the first 
step. It can be 1.0 by default, and only when users find large network memory 
requirements to be a really pain they can start to tune this config. In this 
case, they must understand what this config means and what floating buffers 
are. Floating buffers are already exposed via some configs like 
`taskmanager.network.memory.floating-buffers-per-gate`, so I think it's not a 
bit problem for advanced users to understand it.

Regarding the name of the config, I think that in the future, automatically 
announcing network memory is possible to also serve UNKNOWN resource scenarios. 
Conceptually it should work and the blocker I can see is partially UNKNOWN 
ResourceProfile is not support at the moment. Therefore, I think we do not need 
to bind this config to fine-grained scenarios but we can add a 
`auto-announcing` prefix for it. Maybe 
`taskmanager.network.memory.auto-announcing.fraction-to-announce-for-floating-buffers`?
 In the future we can introduce another config 
`taskmanager.network.memory.auto-announcing.enabled` which allows to enable 
automatically announcing network memory even if it is UNKNOWN resources 
scenario. 

WDYT? [~trohrmann]


was (Author: zhuzh):
I think it should be an advanced and experimental config. It can be 1.0 by 
default, and only when users find the network memory consuming really pain they 
can start to tune this config. In this case, they must understand what this 
config means and what floating buffers are. Floating buffers are already 
exposed via some configs like 
`taskmanager.network.memory.floating-buffers-per-gate`, so I think it's not a 
bit problem for advanced users to understand it.

Regarding the name of the config, I think that in the future, automatically 
announcing network memory is possible to also serve UNKNOWN resource scenarios. 
Conceptually it should work and the blocker I can see is partially UNKNOWN 
ResourceProfile is not support at the moment. Therefore, I think we do not need 
to bind this config to fine-grained scenarios but we can add a 
`auto-announcing` prefix for it. Maybe 
`taskmanager.network.memory.auto-announcing.fraction-to-announce-for-floating-buffers`?
 In the future we can introduce another config 
`taskmanager.network.memory.auto-announcing.enabled` which allows to enable 
automatically announcing network memory even if it is UNKNOWN resources 
scenario. 

WDYT? [~trohrmann]

> Automatically calculate required network memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot 

[jira] [Commented] (FLINK-15031) Automatically calculate required network memory for fine-grained jobs

2021-06-29 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371207#comment-17371207
 ] 

Zhu Zhu commented on FLINK-15031:
-

I think it should be an advanced and experimental config. It can be 1.0 by 
default, and only when users find the network memory consuming really pain they 
can start to tune this config. In this case, they must understand what this 
config means and what floating buffers are. Floating buffers are already 
exposed via some configs like 
`taskmanager.network.memory.floating-buffers-per-gate`, so I think it's not a 
bit problem for advanced users to understand it.

Regarding the name of the config, I think that in the future, automatically 
announcing network memory is possible to also serve UNKNOWN resource scenarios. 
Conceptually it should work and the blocker I can see is partially UNKNOWN 
ResourceProfile is not support at the moment. Therefore, I think we do not need 
to bind this config to fine-grained scenarios but we can add a 
`auto-announcing` prefix for it. Maybe 
`taskmanager.network.memory.auto-announcing.fraction-to-announce-for-floating-buffers`?
 In the future we can introduce another config 
`taskmanager.network.memory.auto-announcing.enabled` which allows to enable 
automatically announcing network memory even if it is UNKNOWN resources 
scenario. 

WDYT? [~trohrmann]

> Automatically calculate required network memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15031) Automatically calculate required network memory for fine-grained jobs

2021-06-28 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17370577#comment-17370577
 ] 

Zhu Zhu commented on FLINK-15031:
-

Thanks for reviving this discussion! 
This improvement is necessary for fine-grained jobs and can benefit users a lot.
Regarding whether to include floating buffers in announced network memory, my 
main concern is that it is possible to result in doubled network memory 
requirement. It can be a pain point for resource-sensitive users, especially 
for large scale jobs which would require terabytes of network memory. 
I'm thinking maybe we can introduce a fraction style config option which 
indicates the percentage of floating buffer memory to be included in the 
announced network memory. If it is 0.0, job will run with minimum required 
network memory and issue of FLINK-12852 may happen more frequently. If it is 
1.0, doubled network memory will be required and issue of FLINK-12852 can be 
avoided.

> Automatically calculate required network memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15031) Automatically calculate required network memory for fine-grained jobs

2021-06-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-15031:

Summary: Automatically calculate required network memory for fine-grained 
jobs  (was: Automatically calculate required shuffle memory for fine-grained 
jobs)

> Automatically calculate required network memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (FLINK-15031) Automatically calculate required shuffle memory for fine-grained jobs

2021-06-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reopened FLINK-15031:
-
  Assignee: Jin Xing  (was: Zhu Zhu)

> Automatically calculate required shuffle memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15031) Automatically calculate required shuffle memory for fine-grained jobs

2021-06-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-15031:

Summary: Automatically calculate required shuffle memory for fine-grained 
jobs  (was: Calculate required shuffle memory before allocating slots if 
resources are specified)

> Automatically calculate required shuffle memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22945:

Fix Version/s: 1.13.2
   1.14.0

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Assignee: Gen Luo
>Priority: Critical
> Fix For: 1.14.0, 1.13.2
>
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.c

[jira] [Assigned] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22945:
---

Assignee: Gen Luo  (was: Luo Gen)

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Assignee: Gen Luo
>Priority: Critical
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:

[jira] [Assigned] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22945:
---

Assignee: Luo Gen

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Assignee: Luo Gen
>Priority: Critical
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
>  ~[flink-

[jira] [Commented] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-28 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17370462#comment-17370462
 ] 

Zhu Zhu commented on FLINK-22945:
-

[~pltbkd] I have assign you the ticket. Feel free to open a fix for it.
As discussed offline, if breaking the recursive loop via async call, slot 
status may change before the async call is executed, which may introduce quite 
some complexity to avoid slot leak. Given that the recursive loop would not 
happen if we correctly cleans up the pending requests, I agree to just fix the 
entry to the recursive loop by clearing pending requests in 
ExecutionSlotAllocator when a job is CANCELING or FAILING.

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Assignee: Gen Luo
>Priority: Critical
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.

[jira] [Updated] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22945:

Priority: Critical  (was: Major)

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Priority: Critical
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
>  ~[flink-dist_2.11-1.13-vvr-4.0-

[jira] [Updated] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22945:

Labels:   (was: auto-deprioritized-critical)

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Priority: Major
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
>  ~[flink-dist_2.11-1.13

[jira] [Commented] (FLINK-23153) Benchmark not compiling

2021-06-24 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17369262#comment-17369262
 ] 

Zhu Zhu commented on FLINK-23153:
-

Thanks for reporting this issue [~Thesharing]. I have assigned you the ticket.

> Benchmark not compiling
> ---
>
> Key: FLINK-23153
> URL: https://issues.apache.org/jira/browse/FLINK-23153
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In FLINK-23085, FutureUtils is moved from flink-runtime to flink-core. The 
> reference in flink-benchmark should also be changed. The reference is located 
> at: org/apache/flink/benchmark/operators/RecordSource.java.
> The travis CI is broken at this moment: 
> https://travis-ci.com/github/apache/flink-benchmarks/builds/230813827#L2026



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23153) Benchmark not compiling

2021-06-24 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23153:
---

Assignee: Zhilong Hong

> Benchmark not compiling
> ---
>
> Key: FLINK-23153
> URL: https://issues.apache.org/jira/browse/FLINK-23153
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In FLINK-23085, FutureUtils is moved from flink-runtime to flink-core. The 
> reference in flink-benchmark should also be changed. The reference is located 
> at: org/apache/flink/benchmark/operators/RecordSource.java.
> The travis CI is broken at this moment: 
> https://travis-ci.com/github/apache/flink-benchmarks/builds/230813827#L2026



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23005) Optimize the deployment of tasks

2021-06-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366577#comment-17366577
 ] 

Zhu Zhu commented on FLINK-23005:
-

Thanks [~Thesharing] for looking into the problem and proposing an improvement 
for it. 
The proposal looks good to me and the POC result looks quite promising.
I have assigned the ticket to you and feel free to open a PR for the 
improvement.

> Optimize the deployment of tasks
> 
>
> Key: FLINK-23005
> URL: https://issues.apache.org/jira/browse/FLINK-23005
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via t

[jira] [Assigned] (FLINK-23005) Optimize the deployment of tasks

2021-06-21 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23005:
---

Assignee: Zhilong Hong

> Optimize the deployment of tasks
> 
>
> Key: FLINK-23005
> URL: https://issues.apache.org/jira/browse/FLINK-23005
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance 
> of our optimization. We choose the streaming job in the experiment because no 
> task will be running until all tasks are deployed. This avoids other 
> d

[jira] [Updated] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-09 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22945:

Description: 
The pending requests in ExecutionSlotAllocator are not cleared when a job 
transitions to CANCELING or FAILING, while all vertices will be canceled and 
assigned slot will be returned. The returned slot is possible to be used to 
fulfill the pending request of a CANCELED vertex and the assignment will fail 
immediately and the slot will be returned and used to fulfilled another vertex 
in a recursive way. StackOverflow can happen in this way when there are many 
vertices, and fatal error can happen and lead to JM will crash. A sample call 
stack is attached below.
To fix this problem, we should clear the pending requests in 
ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
think it's better to also improve the call stack of slot assignment to avoid 
similar StackOverflowException to occur.


...
at 
org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.removeLogicalSlotRequest(SharedSlot.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org

[jira] [Updated] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-09 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22945:

Component/s: Runtime / Coordination

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Priority: Critical
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. However, I think 
> it's better to improve the call stack of slot assignment to avoid similar 
> StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHO

[jira] [Updated] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-09 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22945:

Summary: StackOverflowException can happen when a large scale job is 
CANCELING/FAILING  (was: StackOverflowException can happen when a large scale 
job is CANCELED/FAILED)

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Priority: Critical
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. However, I think 
> it's better to improve the call stack of slot assignment to avoid similar 
> StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.can

[jira] [Updated] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELED/FAILED

2021-06-09 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22945:

Description: 
The pending requests in ExecutionSlotAllocator are not cleared when a job 
transitions to CANCELING or FAILING, while all vertices will be canceled and 
assigned slot will be returned. The returned slot is possible to be used to 
fulfill the pending request of a CANCELED vertex and the assignment will fail 
immediately and the slot will be returned and used to fulfilled another vertex 
in a recursive way. StackOverflow can happen in this way when there are many 
vertices, and fatal error can happen and lead to JM will crash. A sample call 
stack is attached below.
To fix this problem, we should clear the pending requests in 
ExecutionSlotAllocator when a job is CANCELING or FAILING. However, I think 
it's better to improve the call stack of slot assignment to avoid similar 
StackOverflowException to occur.


...
at 
org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.removeLogicalSlotRequest(SharedSlot.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.fl

[jira] [Updated] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELED/FAILED

2021-06-09 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22945:

Priority: Critical  (was: Major)

> StackOverflowException can happen when a large scale job is CANCELED/FAILED
> ---
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Priority: Critical
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices. A sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. However, I think 
> it's better to improve the call stack of slot assignment to avoid similar 
> StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocato

<    6   7   8   9   10   11   12   13   14   15   >