[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-29 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738851#comment-17738851
 ] 

loyi commented on FLINK-23190:
--

[~heigebupahei]  I implemented it in flink1.13.2,  not sure whether it is 
related to version changes. I will try to find out the reason,not guaranteed. 

 

 

 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-27 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737930#comment-17737930
 ] 

loyi commented on FLINK-23190:
--

[~heigebupahei]   Thanks for your trying.   Have you opened the config 
*jobmanager.scheduler.enable-slot-allocate-order-optimization*  which was added 
for the patch?  If you did so yet, could you provide the example code? I will 
try to see why it did not work.   

 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-26 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737413#comment-17737413
 ] 

loyi commented on FLINK-23190:
--

[~heigebupahei]   Looking forward to your feedback. :)

 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2022-03-09 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503952#comment-17503952
 ] 

loyi commented on FLINK-23190:
--

Thank you both for your kind replies. 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly

2022-03-08 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23190:
-
Flags:   (was: Patch)

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2022-03-07 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17502196#comment-17502196
 ] 

loyi commented on FLINK-23190:
--

Hi [~trohrmann] , I am sorry to disturb you. Is the feature still be 
considered? Or the community has another plan/solutions ?   Please give me some 
feedbacks.  Thanks.

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly

2021-11-23 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23190:
-
Affects Version/s: 1.13.1
   1.14.0

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-11-08 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17440937#comment-17440937
 ] 

loyi commented on FLINK-23190:
--

Hi, [~trohrmann] , it seems to be a few months since we last talked about this 
issue, Could I ask you how things are going?  

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-08-30 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407022#comment-17407022
 ] 

loyi commented on FLINK-23190:
--

Get it . Thanks.

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-08-30 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406616#comment-17406616
 ] 

loyi commented on FLINK-23190:
--

I have submitted a [PR|https://github.com/apache/flink/pull/16929] for this 
feature, do you have time to take a look?  Thanks.  [~trohrmann] 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23848) PulsarSourceITCase is failed on Azure

2021-08-23 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402968#comment-17402968
 ] 

loyi commented on FLINK-23848:
--

I met a similar issue.

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22612=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=25015]

 

> PulsarSourceITCase is failed on Azure
> -
>
> Key: FLINK-23848
> URL: https://issues.apache.org/jira/browse/FLINK-23848
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.0
>Reporter: Jark Wu
>Assignee: Yufan Sheng
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22412=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461
> {code}
> 2021-08-17T20:11:53.7228789Z Aug 17 20:11:53 [INFO] Running 
> org.apache.flink.connector.pulsar.source.PulsarSourceITCase
> 2021-08-17T20:17:38.2429467Z Aug 17 20:17:38 [ERROR] Tests run: 8, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 344.515 s <<< FAILURE! - in 
> org.apache.flink.connector.pulsar.source.PulsarSourceITCase
> 2021-08-17T20:17:38.2430693Z Aug 17 20:17:38 [ERROR] 
> testMultipleSplits{TestEnvironment, ExternalContext}[2]  Time elapsed: 66.766 
> s  <<< ERROR!
> 2021-08-17T20:17:38.2431387Z Aug 17 20:17:38 java.lang.RuntimeException: 
> Failed to fetch next result
> 2021-08-17T20:17:38.2432035Z Aug 17 20:17:38  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> 2021-08-17T20:17:38.2433345Z Aug 17 20:17:38  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-08-17T20:17:38.2434175Z Aug 17 20:17:38  at 
> org.apache.flink.connectors.test.common.utils.TestDataMatchers$MultipleSplitDataMatcher.matchesSafely(TestDataMatchers.java:151)
> 2021-08-17T20:17:38.2435028Z Aug 17 20:17:38  at 
> org.apache.flink.connectors.test.common.utils.TestDataMatchers$MultipleSplitDataMatcher.matchesSafely(TestDataMatchers.java:133)
> 2021-08-17T20:17:38.2438387Z Aug 17 20:17:38  at 
> org.hamcrest.TypeSafeDiagnosingMatcher.matches(TypeSafeDiagnosingMatcher.java:55)
> 2021-08-17T20:17:38.2439100Z Aug 17 20:17:38  at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:12)
> 2021-08-17T20:17:38.2439708Z Aug 17 20:17:38  at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> 2021-08-17T20:17:38.2440299Z Aug 17 20:17:38  at 
> org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase.testMultipleSplits(SourceTestSuiteBase.java:156)
> 2021-08-17T20:17:38.2441007Z Aug 17 20:17:38  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-08-17T20:17:38.2441526Z Aug 17 20:17:38  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-08-17T20:17:38.2442068Z Aug 17 20:17:38  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-08-17T20:17:38.2442759Z Aug 17 20:17:38  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-08-17T20:17:38.2443247Z Aug 17 20:17:38  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
> 2021-08-17T20:17:38.2443812Z Aug 17 20:17:38  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> 2021-08-17T20:17:38.241Z Aug 17 20:17:38  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> 2021-08-17T20:17:38.2445101Z Aug 17 20:17:38  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> 2021-08-17T20:17:38.2445688Z Aug 17 20:17:38  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
> 2021-08-17T20:17:38.2446328Z Aug 17 20:17:38  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
> 2021-08-17T20:17:38.2447303Z Aug 17 20:17:38  at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> 2021-08-17T20:17:38.2448336Z Aug 17 20:17:38  at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> 2021-08-17T20:17:38.2448999Z Aug 17 20:17:38  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> 2021-08-17T20:17:38.2449689Z Aug 17 20:17:38  at 
> 

[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-08-19 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401642#comment-17401642
 ] 

loyi commented on FLINK-23190:
--

[~trohrmann]  I'm willing to contribute this feature.  

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2021-08-12 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17397921#comment-17397921
 ] 

loyi edited comment on FLINK-23190 at 8/12/21, 8:21 AM:


Thanks for your replying  [~trohrmann]  :)

I aggre with you that group details shouldn't be put in the SlotRequestId, so i 
find another solution, below is the idea:

As we know, the DefaultScheduler offers slot for PendingRequest in request 
order (beacuse PendingRequest is stored in a LinkedHashMap), so we can control 
the ExecutionSlotSharingGroup slot-request order to achieve same goal , and it 
only involves ExecutionSlotAllocator's implemention.

Here is the concept code:
{code:java}
@Override
public List allocateSlotsFor(
List executionVertexIds) {

// bla bla

Map> executionsByGroup =
executionVertexIds.stream()
.collect(
Collectors.groupingBy(

slotSharingStrategy::getExecutionSlotSharingGroup));

Map slots =
// Here  we can control the ExecutionSlotSharingGroup allocation 
order instead of random order.
// For example, a circle order of JobVertexGroup
sortExecutionSlotSharingGroup(executionsByGroup.keySet())
.stream().map(group -> getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever))
.collect(
Collectors.toMap(
SharedSlot::getExecutionSlotSharingGroup,
Function.identity()));
 // bla bla
}


private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
 .collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>();

int i = executionVertexGroups.size(), j = 0;
while (i > 0) {
List group = groups.get((j++) % 
groups.size());
if (group.isEmpty()) {
continue;
}
i--;
sorted.add(group.remove(0));
}
return sorted;
}


Set getJobVertexSharingGroup(ExecutionSlotSharingGroup group) {
return group.getExecutionVertexIds().stream()
.map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet());
}
{code}
 After my test, it has a same distribution result as old solution.  Do you 
think it is feasible?

 

 


was (Author: loyi):
Thanks for your replying  [~trohrmann]  :)

For DefaultScheduler : 

I aggre with you that group details shouldn't be put in the SlotRequestId, so i 
find another solution, below is the idea:

DefaultScheduler offers slot for PendingRequest in request order (beacuse 
PendingRequest is stored in a LinkedHashMap), so we can control the 
ExecutionSlotSharingGroup slot-request order to achieve same goal , and it only 
involves ExecutionSlotAllocator's implemention.

Here is the concept code:
{code:java}
@Override
public List allocateSlotsFor(
List executionVertexIds) {

// bla bla

Map> executionsByGroup =
executionVertexIds.stream()
.collect(
Collectors.groupingBy(

slotSharingStrategy::getExecutionSlotSharingGroup));

Map slots =
// Here  we can control the ExecutionSlotSharingGroup allocation 
order instead of random order.
// For example, a circle order of JobVertexGroup
sortExecutionSlotSharingGroup(executionsByGroup.keySet())
.stream().map(group -> getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever))
.collect(
Collectors.toMap(
SharedSlot::getExecutionSlotSharingGroup,
Function.identity()));
 // bla bla
}


private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
 .collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>();

int i = executionVertexGroups.size(), j = 0;
while (i > 0) {
List group = groups.get((j++) % 
groups.size());
if (group.isEmpty()) {
continue;
}
i--;
sorted.add(group.remove(0));
}
return sorted;
}



[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-08-12 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17397921#comment-17397921
 ] 

loyi commented on FLINK-23190:
--

Thanks for your replying  [~trohrmann]  :)

For DefaultScheduler : 

I aggre with you that group details shouldn't be put in the SlotRequestId, so i 
find another solution, below is the idea:

DefaultScheduler offers slot for PendingRequest in request order (beacuse 
PendingRequest is stored in a LinkedHashMap), so we can control the 
ExecutionSlotSharingGroup slot-request order to achieve same goal , and it only 
involves ExecutionSlotAllocator's implemention.

Here is the concept code:
{code:java}
@Override
public List allocateSlotsFor(
List executionVertexIds) {

// bla bla

Map> executionsByGroup =
executionVertexIds.stream()
.collect(
Collectors.groupingBy(

slotSharingStrategy::getExecutionSlotSharingGroup));

Map slots =
// Here  we can control the ExecutionSlotSharingGroup allocation 
order instead of random order.
// For example, a circle order of JobVertexGroup
sortExecutionSlotSharingGroup(executionsByGroup.keySet())
.stream().map(group -> getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever))
.collect(
Collectors.toMap(
SharedSlot::getExecutionSlotSharingGroup,
Function.identity()));
 // bla bla
}


private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
 .collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>();

int i = executionVertexGroups.size(), j = 0;
while (i > 0) {
List group = groups.get((j++) % 
groups.size());
if (group.isEmpty()) {
continue;
}
i--;
sorted.add(group.remove(0));
}
return sorted;
}


Set getJobVertexSharingGroup(ExecutionSlotSharingGroup group) {
return group.getExecutionVertexIds().stream()
.map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet());
}
{code}
 After my test, it has a same distribution result as old solution.  Do you 
think it is feasible?

 

 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> 

[jira] [Commented] (FLINK-23595) Allow JSON format deserialize non-numeric numbers

2021-08-09 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396369#comment-17396369
 ] 

loyi commented on FLINK-23595:
--

[~jark]  Hi,  i would like to contribute this feature, can you assign this 
ticket to me ?

> Allow JSON format deserialize non-numeric numbers
> -
>
> Key: FLINK-23595
> URL: https://issues.apache.org/jira/browse/FLINK-23595
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.13.1
>Reporter: loyi
>Priority: Major
>
> I am migirating old flink-stream system to flink-sql , however it occurs an 
> Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.  
>  
> {noformat}
> Exception stack:
> Caused by: java.io.IOException: Failed to deserialize JSON 
> '{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
>         at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
>  ~[?:?]
>         at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
>  ~[?:?]
>         at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> Caused by: 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
>  Non-standard token 'NaN': enable 
> JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow
>  at [Source: UNKNOWN; line: 1, column: 310]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> 

[jira] [Commented] (FLINK-23595) Allow JSON format deserialize non-numeric numbers

2021-08-09 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395958#comment-17395958
 ] 

loyi commented on FLINK-23595:
--

hi [~jark],  can this feature be supported?

> Allow JSON format deserialize non-numeric numbers
> -
>
> Key: FLINK-23595
> URL: https://issues.apache.org/jira/browse/FLINK-23595
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.13.1
>Reporter: loyi
>Priority: Major
>
> I am migirating old flink-stream system to flink-sql , however it occurs an 
> Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.  
>  
> {noformat}
> Exception stack:
> Caused by: java.io.IOException: Failed to deserialize JSON 
> '{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
>         at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
>  ~[?:?]
>         at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
>  ~[?:?]
>         at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> Caused by: 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
>  Non-standard token 'NaN': enable 
> JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow
>  at [Source: UNKNOWN; line: 1, column: 310]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> 

[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-08-08 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395691#comment-17395691
 ] 

loyi commented on FLINK-23190:
--

[~trohrmann]  hi, do you have a conclusion about this ? 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.

2021-08-05 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394441#comment-17394441
 ] 

loyi commented on FLINK-23645:
--

[~jark]  yes ,please assign to me

> Sqlclient doesn't response CTRL-C correctly before Executor ResultView 
> opening.
> ---
>
> Key: FLINK-23645
> URL: https://issues.apache.org/jira/browse/FLINK-23645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.1
>Reporter: loyi
>Priority: Minor
> Attachments: image-2021-08-05-18-31-50-487.png, 
> image-2021-08-05-18-44-08-453.png
>
>
> If we press ctrl-c before the *ResultView* opened,the terminal will exits 
> without any prompt.
>  For example:
> !image-2021-08-05-18-31-50-487.png!
> After analsis, i found out the reason is Sqlclient doesn't register 
> *Signal.INT* handler before *callOperation*, then jvm default handler 
> executes and exit.
>  
> Suggestion:
> We could just interupt the Executor when we receive  *Signal.INT*
>  
> {code:java}
> private void callOperation(Operation operation, ExecutionMode mode) {
> validate(operation, mode);
> final Thread thread = Thread.currentThread();
> final Terminal.SignalHandler previousHandler =
> terminal.handle(Terminal.Signal.INT, (signal) -> 
> thread.interrupt());
> try {
> if (operation instanceof QuitOperation) {
> // QUIT/EXIT
> callQuit();
> } else if (operation instanceof ClearOperation) {
> // CLEAR
> callClear();
> } else if (operation instanceof HelpOperation) {
> // HELP
> callHelp();
> } else if (operation instanceof SetOperation) {
> // SET
> callSet((SetOperation) operation);
> } else if (operation instanceof ResetOperation) {
> // RESET
> callReset((ResetOperation) operation);
> } else if (operation instanceof CatalogSinkModifyOperation) {
> // INSERT INTO/OVERWRITE
> callInsert((CatalogSinkModifyOperation) operation);
> } else if (operation instanceof QueryOperation) {
> // SELECT
> callSelect((QueryOperation) operation);
> } else if (operation instanceof ExplainOperation) {
> // EXPLAIN
> callExplain((ExplainOperation) operation);
> } else if (operation instanceof BeginStatementSetOperation) {
> // BEGIN STATEMENT SET
> callBeginStatementSet();
> } else if (operation instanceof EndStatementSetOperation) {
> // END
> callEndStatementSet();
> } else {
> // fallback to default implementation
> executeOperation(operation);
> }
> } finally {
> terminal.handle(Terminal.Signal.INT, previousHandler);
>}
> }{code}
>  
> After fixed:
> !image-2021-08-05-18-44-08-453.png!  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23645:
-
Description: 
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:

!image-2021-08-05-18-31-50-487.png!

After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
 

After fixed:

!image-2021-08-05-18-44-08-453.png!  

  was:
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:

!image-2021-08-05-18-31-50-487.png!


 After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
**

 


> Sqlclient doesn't response CTRL-C correctly before Executor ResultView 
> opening.
> ---
>
> Key: FLINK-23645
> URL: https://issues.apache.org/jira/browse/FLINK-23645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>

[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23645:
-
Attachment: image-2021-08-05-18-44-08-453.png

> Sqlclient doesn't response CTRL-C correctly before Executor ResultView 
> opening.
> ---
>
> Key: FLINK-23645
> URL: https://issues.apache.org/jira/browse/FLINK-23645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.1
>Reporter: loyi
>Priority: Minor
> Attachments: image-2021-08-05-18-31-50-487.png, 
> image-2021-08-05-18-44-08-453.png
>
>
> If we press ctrl-c before the *ResultView* opened,the terminal will exits 
> without any prompt.
>  For example:
> !image-2021-08-05-18-31-50-487.png!
>  After analsis, i found out the reason is Sqlclient doesn't register 
> *Signal.INT* handler before *callOperation*, then jvm default handler 
> executes and exit.
>  
> Suggestion:
> We could just interupt the Executor when we receive  *Signal.INT*
>  
> {code:java}
> private void callOperation(Operation operation, ExecutionMode mode) {
> validate(operation, mode);
> final Thread thread = Thread.currentThread();
> final Terminal.SignalHandler previousHandler =
> terminal.handle(Terminal.Signal.INT, (signal) -> 
> thread.interrupt());
> try {
> if (operation instanceof QuitOperation) {
> // QUIT/EXIT
> callQuit();
> } else if (operation instanceof ClearOperation) {
> // CLEAR
> callClear();
> } else if (operation instanceof HelpOperation) {
> // HELP
> callHelp();
> } else if (operation instanceof SetOperation) {
> // SET
> callSet((SetOperation) operation);
> } else if (operation instanceof ResetOperation) {
> // RESET
> callReset((ResetOperation) operation);
> } else if (operation instanceof CatalogSinkModifyOperation) {
> // INSERT INTO/OVERWRITE
> callInsert((CatalogSinkModifyOperation) operation);
> } else if (operation instanceof QueryOperation) {
> // SELECT
> callSelect((QueryOperation) operation);
> } else if (operation instanceof ExplainOperation) {
> // EXPLAIN
> callExplain((ExplainOperation) operation);
> } else if (operation instanceof BeginStatementSetOperation) {
> // BEGIN STATEMENT SET
> callBeginStatementSet();
> } else if (operation instanceof EndStatementSetOperation) {
> // END
> callEndStatementSet();
> } else {
> // fallback to default implementation
> executeOperation(operation);
> }
> } finally {
> terminal.handle(Terminal.Signal.INT, previousHandler);
>}
> }{code}
> **
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23595) Allow JSON format deserialize non-numeric numbers

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23595:
-
Description: 
I am migirating old flink-stream system to flink-sql , however it occurs an 
Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.  

 
{noformat}
Exception stack:

Caused by: java.io.IOException: Failed to deserialize JSON 
'{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[?:?]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: 
Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS 
to allow
 at [Source: UNKNOWN; line: 1, column: 310]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 

[jira] [Updated] (FLINK-23595) Allow JSON format deserialize non-numeric numbers

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23595:
-
Environment: (was: {noformat}
 {noformat})

> Allow JSON format deserialize non-numeric numbers
> -
>
> Key: FLINK-23595
> URL: https://issues.apache.org/jira/browse/FLINK-23595
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.13.1
>Reporter: loyi
>Priority: Major
>
> I am migirating old flink-stream system to flink-sql , however it occurs an 
> Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.  
>  
> Suggestion:
> I notice that *JsonRowDataDeserializationSchema* using ** jackson ** as 
> default implemention, and jackson doesn't enable parsing non-numeric number 
> by default. For backward compatibility, we could add option 
> `allow-non-numeric-numbers` to enable this feature.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23595) Allow JSON format deserialize non-numeric numbers

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23595:
-
Description: 
I am migirating old flink-stream system to flink-sql , however it occurs an 
Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.  

 
{noformat}
Exception stack:

Caused by: java.io.IOException: Failed to deserialize JSON 
'{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[?:?]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: 
Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS 
to allow
 at [Source: UNKNOWN; line: 1, column: 310]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 

[jira] [Updated] (FLINK-23595) Allow JSON format deserialize non-numeric numbers

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23595:
-
Environment: 
{noformat}
 {noformat}

  was:
{noformat}
Exception stack:

Caused by: java.io.IOException: Failed to deserialize JSON 
'{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[?:?]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: 
Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS 
to allow
 at [Source: UNKNOWN; line: 1, column: 310]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
 ~[?:?]
        at 

[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23645:
-
Description: 
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:

!image-2021-08-05-18-31-50-487.png!


 After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
**

 

  was:
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:
{noformat}
Flink SQL> select 1;
^C
[bot@ bin]${noformat}
After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
**

 


> Sqlclient doesn't response CTRL-C correctly before Executor ResultView 
> opening.
> ---
>
> Key: FLINK-23645
> URL: https://issues.apache.org/jira/browse/FLINK-23645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.1
>

[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23645:
-
Description: 
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:
{noformat}
Flink SQL> select 1;
^C
[bot@ bin]${noformat}
After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
**

 

  was:
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:
{noformat}
Flink SQL> select 1;
^C
[bot@ bin]${noformat}{noformat}
After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
**

 


> Sqlclient doesn't response CTRL-C correctly before Executor ResultView 
> opening.
> ---
>
> Key: FLINK-23645
> URL: https://issues.apache.org/jira/browse/FLINK-23645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>

[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23645:
-
Attachment: image-2021-08-05-18-31-50-487.png

> Sqlclient doesn't response CTRL-C correctly before Executor ResultView 
> opening.
> ---
>
> Key: FLINK-23645
> URL: https://issues.apache.org/jira/browse/FLINK-23645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.1
>Reporter: loyi
>Priority: Minor
> Attachments: image-2021-08-05-18-31-50-487.png
>
>
> If we press ctrl-c before the *ResultView* opened,the terminal will exits 
> without any prompt.
>  For example:
> {noformat}
> Flink SQL> select 1;
> ^C
> [bot@ bin]${noformat}
> After analsis, i found out the reason is Sqlclient doesn't register 
> *Signal.INT* handler before *callOperation*, then jvm default handler 
> executes and exit.
>  
> Suggestion:
> We could just interupt the Executor when we receive  *Signal.INT*
>  
> {code:java}
> private void callOperation(Operation operation, ExecutionMode mode) {
> validate(operation, mode);
> final Thread thread = Thread.currentThread();
> final Terminal.SignalHandler previousHandler =
> terminal.handle(Terminal.Signal.INT, (signal) -> 
> thread.interrupt());
> try {
> if (operation instanceof QuitOperation) {
> // QUIT/EXIT
> callQuit();
> } else if (operation instanceof ClearOperation) {
> // CLEAR
> callClear();
> } else if (operation instanceof HelpOperation) {
> // HELP
> callHelp();
> } else if (operation instanceof SetOperation) {
> // SET
> callSet((SetOperation) operation);
> } else if (operation instanceof ResetOperation) {
> // RESET
> callReset((ResetOperation) operation);
> } else if (operation instanceof CatalogSinkModifyOperation) {
> // INSERT INTO/OVERWRITE
> callInsert((CatalogSinkModifyOperation) operation);
> } else if (operation instanceof QueryOperation) {
> // SELECT
> callSelect((QueryOperation) operation);
> } else if (operation instanceof ExplainOperation) {
> // EXPLAIN
> callExplain((ExplainOperation) operation);
> } else if (operation instanceof BeginStatementSetOperation) {
> // BEGIN STATEMENT SET
> callBeginStatementSet();
> } else if (operation instanceof EndStatementSetOperation) {
> // END
> callEndStatementSet();
> } else {
> // fallback to default implementation
> executeOperation(operation);
> }
> } finally {
> terminal.handle(Terminal.Signal.INT, previousHandler);
>}
> }{code}
> **
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23645:
-
Description: 
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:
{noformat}
Flink SQL> select 1;
 ^C
[bot@ bin]${noformat}{noformat}

 After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
**

 

  was:
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:
{noformat}
Flink SQL> select 1;
 ^C
[bot@ bin]${noformat}
After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
**

 


> Sqlclient doesn't response CTRL-C correctly before Executor ResultView 
> opening.
> ---
>
> Key: FLINK-23645
> URL: https://issues.apache.org/jira/browse/FLINK-23645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>

[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23645:
-
Description: 
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:
{noformat}
Flink SQL> select 1;
^C
[bot@ bin]${noformat}{noformat}
After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
**

 

  was:
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:
{noformat}
Flink SQL> select 1;
 ^C
[bot@ bin]${noformat}{noformat}

 After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
**

 


> Sqlclient doesn't response CTRL-C correctly before Executor ResultView 
> opening.
> ---
>
> Key: FLINK-23645
> URL: https://issues.apache.org/jira/browse/FLINK-23645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / 

[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.

2021-08-05 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23645:
-
Description: 
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 For example:
{noformat}
Flink SQL> select 1;
 ^C
[bot@ bin]${noformat}
After analsis, i found out the reason is Sqlclient doesn't register 
*Signal.INT* handler before *callOperation*, then jvm default handler executes 
and exit.

 

Suggestion:

We could just interupt the Executor when we receive  *Signal.INT*

 
{code:java}
private void callOperation(Operation operation, ExecutionMode mode) {
validate(operation, mode);
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> 
thread.interrupt());
try {
if (operation instanceof QuitOperation) {
// QUIT/EXIT
callQuit();
} else if (operation instanceof ClearOperation) {
// CLEAR
callClear();
} else if (operation instanceof HelpOperation) {
// HELP
callHelp();
} else if (operation instanceof SetOperation) {
// SET
callSet((SetOperation) operation);
} else if (operation instanceof ResetOperation) {
// RESET
callReset((ResetOperation) operation);
} else if (operation instanceof CatalogSinkModifyOperation) {
// INSERT INTO/OVERWRITE
callInsert((CatalogSinkModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else if (operation instanceof BeginStatementSetOperation) {
// BEGIN STATEMENT SET
callBeginStatementSet();
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else {
// fallback to default implementation
executeOperation(operation);
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
   }
}{code}
**

 

  was:
If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 

 
{noformat}
Flink SQL> select 1;
^C
[bot@ bin]${noformat}
 


> Sqlclient doesn't response CTRL-C correctly before Executor ResultView 
> opening.
> ---
>
> Key: FLINK-23645
> URL: https://issues.apache.org/jira/browse/FLINK-23645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.1
>Reporter: loyi
>Priority: Minor
>
> If we press ctrl-c before the *ResultView* opened,the terminal will exits 
> without any prompt.
>  For example:
> {noformat}
> Flink SQL> select 1;
>  ^C
> [bot@ bin]${noformat}
> After analsis, i found out the reason is Sqlclient doesn't register 
> *Signal.INT* handler before *callOperation*, then jvm default handler 
> executes and exit.
>  
> Suggestion:
> We could just interupt the Executor when we receive  *Signal.INT*
>  
> {code:java}
> private void callOperation(Operation operation, ExecutionMode mode) {
> validate(operation, mode);
> final Thread thread = Thread.currentThread();
> final Terminal.SignalHandler previousHandler =
> terminal.handle(Terminal.Signal.INT, (signal) -> 
> thread.interrupt());
> try {
> if (operation instanceof QuitOperation) {
> // QUIT/EXIT
> callQuit();
> } else if (operation instanceof ClearOperation) {
> // CLEAR
> callClear();
> } else if (operation instanceof HelpOperation) {
> // HELP
> callHelp();
> } else if (operation instanceof SetOperation) {
> // SET
> callSet((SetOperation) operation);
> } else if (operation instanceof ResetOperation) {
> // RESET
> callReset((ResetOperation) operation);
> } else if (operation instanceof CatalogSinkModifyOperation) {
> // INSERT INTO/OVERWRITE
> callInsert((CatalogSinkModifyOperation) operation);
> } else if (operation instanceof QueryOperation) {
> // SELECT
> callSelect((QueryOperation) operation);
> } else if (operation instanceof ExplainOperation) {
> // EXPLAIN
> callExplain((ExplainOperation) operation);
> } else if (operation instanceof BeginStatementSetOperation) {
> // BEGIN STATEMENT SET
> callBeginStatementSet();
> } 

[jira] [Created] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.

2021-08-05 Thread loyi (Jira)
loyi created FLINK-23645:


 Summary: Sqlclient doesn't response CTRL-C correctly before 
Executor ResultView opening.
 Key: FLINK-23645
 URL: https://issues.apache.org/jira/browse/FLINK-23645
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.13.1
Reporter: loyi


If we press ctrl-c before the *ResultView* opened,the terminal will exits 
without any prompt.

 

 
{noformat}
Flink SQL> select 1;
^C
[bot@ bin]${noformat}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23595) Allow JSON format deserialize non-numeric numbers

2021-08-03 Thread loyi (Jira)
loyi created FLINK-23595:


 Summary: Allow JSON format deserialize non-numeric numbers
 Key: FLINK-23595
 URL: https://issues.apache.org/jira/browse/FLINK-23595
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.13.1
 Environment: {noformat}
Exception stack:

Caused by: java.io.IOException: Failed to deserialize JSON 
'{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[?:?]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: 
Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS 
to allow
 at [Source: UNKNOWN; line: 1, column: 310]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 

[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-20 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17384608#comment-17384608
 ] 

loyi commented on FLINK-23190:
--

[~trohrmann]  Hello,  should i provide more results?  Can these prove that the 
solution is feasible?

Looking forward to your reply.

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-16 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381794#comment-17381794
 ] 

loyi edited comment on FLINK-23190 at 7/16/21, 6:06 AM:


[~trohrmann]  Thanks for your patience.  I have implement a version based on 
flink-1.13.1, and then I will use two real examples for comparison below.
h2. Example 1:

This job could make completely evenly allocation in theory.  



 
{code:java}
// job description
StreamExecutionEnvironment env;
 env.addSource(...).name("a").setParallelism(10).
 map(...).name("b").setParallelism(30)
 .map(...).name("c").setParallelism(40)
 .addSink(...).name("d").setParallelism(20);

//We specify -ys 4 , that means the job will apply for 10 TaskExecutors.
 flink run -yjm 2048 -ytm 2048 -ys 4 -m yarn-cluster -c 
com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar
{code}

 !image-2021-07-16-10-34-30-700.png|width=1042,height=219!

 

 

*flink-1.13.1-official*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|0|1|2|0|0|1|2|2|2|0|
|b|4|1|3|3|4|4|3|3|3|2|
|c|4|4|4|4|4|4|4|4|4|4|
|d|3|1|3|1|4|2|2|2|0|2|

 

*flink-1.13.1-official*(with cluster.evenly-spread-out-slots: true) operator's 
subtask distribution:

 
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|1|1|0|2|0|3|1|0|0|2|
|b|4|1|3|4|4|3|4|2|3|2|
|c|4|4|4|4|4|4|4|4|4|4|
|d|3|1|3|2|3|2|2|1|2|1|

 

*flink-1.13.1-patch*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|1|1|1|1|1|1|1|1|1|1|
|b|3|3|3|3|3|3|3|3|3|3|
|c|4|4|4|4|4|4|4|4|4|4|
|d|2|2|2|2|2|2|2|2|2|2|

 

*flink-1.13.1-patch*(with jobmanager.scheduler: adaptive) operator's subtask 
distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|1|1|1|1|1|1|1|1|1|1|
|b|3|3|3|3|3|3|3|3|3|3|
|c|4|4|4|4|4|4|4|4|4|4|
|d|2|2|2|2|2|2|2|2|2|2|

 

 
h2. Example 2: 

We will construct a job that couldn't acheive absolutely evenly allocation.
{code:java}
//job description is the same with above example.  
StreamExecutionEnvironment env; 
env.addSource(...).name("a").setParallelism(10)
.map(...).name("b").setParallelism(30) 
.map(...).name("c").setParallelism(40) 
.addSink(...).name("d").setParallelism(20);
//We specify -ys 3 , that means the job will apply for 14 TaskExecutors. 
flink run -yjm 2048 -ytm 2048 -ys 3 -m yarn-cluster -c 
com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar
{code}
 

*flink-1.13.1-official*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|3|1|0|3|1|1|1|0|0|0|0|0|0|0|
|b|3|3|3|3|2|2|2|1|3|2|3|2|1|0|
|c|3|3|3|3|3|3|3|3|3|1|3|3|3|3|
|d|2|2|1|2|3|1|1|1|1|1|2|2|1|0|

 

*flink-1.13.1-official*(with cluster.evenly-spread-out-slots: true) operator's 
subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|2|1|1|1|2|2|1|0|0|0|0|0|0|0|
|b|3|3|2|1|1|3|3|2|1|3|2|3|3|0|
|c|3|3|3|1|3|3|3|3|3|3|3|3|3|3|
|d|1|2|2|1|2|3|1|2|2|3|1|0|0|0|

 

*flink-1.13.1-patch*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|1|1|1|1|1|0|1|1|0|1|0|1|0|1|
|b|3|2|2|2|2|2|3|2|2|2|2|2|2|2|
|c|3|3|3|3|3|3|3|3|3|3|3|3|1|3|
|d|2|1|2|1|1|1|2|2|1|2|1|2|2|0|

 

*flink-1.13.1-patch*(with jobmanager.scheduler: adaptive) operator's subtask 
distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|1|0|0|1|1|1|1|0|0|1|1|1|1|1|
|b|2|2|2|2|3|3|3|2|1|2|2|2|2|2|
|c|3|3|3|3|3|3|3|3|1|3|3|3|3|3|
|d|1|1|1|1|2|2|2|1|1|2|1|2|1|2|

 

We can see that the patch has a better allocation effect, and spread out each 
operator across all TaskExecutors as possibly as evenly as it can.
  


was (Author: loyi):
[~trohrmann]  Thanks for your patience.  I have implement a version based on 
flink-1.13.1, and then I will use two real examples for comparison below.
h2. Example 1:

This job could make completely evenly allocation in theory.  
// job description

StreamExecutionEnvironment env;
env.addSource(...).name("a").setParallelism(10).
 map(...).name("b").setParallelism(30)
 .map(...).name("c").setParallelism(40)
 .addSink(...).name("d").setParallelism(20);

//We specify -ys 4 , that means the job will apply for 10 TaskExecutors.
flink run -yjm 2048 -ytm 2048 -ys 4 -m yarn-cluster -c 
com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar
!image-2021-07-16-10-34-30-700.png|width=1042,height=219!

 

*flink-1.13.1-official*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|0|1|2|0|0|1|2|2|2|0|
|b|4|1|3|3|4|4|3|3|3|2|
|c|4|4|4|4|4|4|4|4|4|4|

[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-16 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381794#comment-17381794
 ] 

loyi commented on FLINK-23190:
--

[~trohrmann]  Thanks for your patience.  I have implement a version based on 
flink-1.13.1, and then I will use two real examples for comparison below.
h2. Example 1:

This job could make completely evenly allocation in theory.  
// job description

StreamExecutionEnvironment env;
env.addSource(...).name("a").setParallelism(10).
 map(...).name("b").setParallelism(30)
 .map(...).name("c").setParallelism(40)
 .addSink(...).name("d").setParallelism(20);

//We specify -ys 4 , that means the job will apply for 10 TaskExecutors.
flink run -yjm 2048 -ytm 2048 -ys 4 -m yarn-cluster -c 
com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar
!image-2021-07-16-10-34-30-700.png|width=1042,height=219!

 

*flink-1.13.1-official*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|0|1|2|0|0|1|2|2|2|0|
|b|4|1|3|3|4|4|3|3|3|2|
|c|4|4|4|4|4|4|4|4|4|4|
|d|3|1|3|1|4|2|2|2|0|2|

 

*flink-1.13.1-official*(with cluster.evenly-spread-out-slots: true) operator's 
subtask distribution:

 
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|1|1|0|2|0|3|1|0|0|2|
|b|4|1|3|4|4|3|4|2|3|2|
|c|4|4|4|4|4|4|4|4|4|4|
|d|3|1|3|2|3|2|2|1|2|1|

 

*flink-1.13.1-patch*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|1|1|1|1|1|1|1|1|1|1|
|b|3|3|3|3|3|3|3|3|3|3|
|c|4|4|4|4|4|4|4|4|4|4|
|d|2|2|2|2|2|2|2|2|2|2|

 

*flink-1.13.1-patch*(with jobmanager.scheduler: adaptive) operator's subtask 
distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|1|1|1|1|1|1|1|1|1|1|
|b|3|3|3|3|3|3|3|3|3|3|
|c|4|4|4|4|4|4|4|4|4|4|
|d|2|2|2|2|2|2|2|2|2|2|

 

 
h2. Example 2: 

We will construct a job that couldn't acheive absolutely evenly allocation.
//job description is the same with above.

StreamExecutionEnvironment env;
env.addSource(...).name("a").setParallelism(10).
 map(...).name("b").setParallelism(30)
 .map(...).name("c").setParallelism(40)
 .addSink(...).name("d").setParallelism(20);

//We specify -ys 3 , that means the job will apply for 14 TaskExecutors.
flink run -yjm 2048 -ytm 2048 -ys 3 -m yarn-cluster -c 
com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar
 

*flink-1.13.1-official*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|3|1|0|3|1|1|1|0|0|0|0|0|0|0|
|b|3|3|3|3|2|2|2|1|3|2|3|2|1|0|
|c|3|3|3|3|3|3|3|3|3|1|3|3|3|3|
|d|2|2|1|2|3|1|1|1|1|1|2|2|1|0|

 

*flink-1.13.1-official*(with cluster.evenly-spread-out-slots: true) operator's 
subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|2|1|1|1|2|2|1|0|0|0|0|0|0|0|
|b|3|3|2|1|1|3|3|2|1|3|2|3|3|0|
|c|3|3|3|1|3|3|3|3|3|3|3|3|3|3|
|d|1|2|2|1|2|3|1|2|2|3|1|0|0|0|

 

*flink-1.13.1-patch*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|1|1|1|1|1|0|1|1|0|1|0|1|0|1|
|b|3|2|2|2|2|2|3|2|2|2|2|2|2|2|
|c|3|3|3|3|3|3|3|3|3|3|3|3|1|3|
|d|2|1|2|1|1|1|2|2|1|2|1|2|2|0|

 

*flink-1.13.1-patch*(with jobmanager.scheduler: adaptive) operator's subtask 
distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|1|0|0|1|1|1|1|0|0|1|1|1|1|1|
|b|2|2|2|2|3|3|3|2|1|2|2|2|2|2|
|c|3|3|3|3|3|3|3|3|1|3|3|3|3|3|
|d|1|1|1|1|2|2|2|1|1|2|1|2|1|2|

 

We can see that the patch has a better allocation effect, and spread out each 
operator across all TaskExecutors as possibly as evenly as it can.
 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 

[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-15 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23190:
-
Attachment: image-2021-07-16-10-34-30-700.png

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-15 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23190:
-
Comment: was deleted

(was: Job description:

// flink run -yjm 2048 -ytm 2048 -ys 4 -ynm demo-job -m yarn-cluster -c 
com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo/flink-demo.jar
env.addSource(...).name("A").setParallelism(10)
 .map(...).name("B").setParallelism(30)
 .map(...).name("C").setParallelism(40)
 .addSink(...).name("D").setParallelism(20);)

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-15 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381661#comment-17381661
 ] 

loyi commented on FLINK-23190:
--

Job description:

// flink run -yjm 2048 -ytm 2048 -ys 4 -ynm demo-job -m yarn-cluster -c 
com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo/flink-demo.jar
env.addSource(...).name("A").setParallelism(10)
 .map(...).name("B").setParallelism(30)
 .map(...).name("C").setParallelism(40)
 .addSink(...).name("D").setParallelism(20);

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-15 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381353#comment-17381353
 ] 

loyi edited comment on FLINK-23190 at 7/15/21, 1:36 PM:


[~trohrmann]   Here is the differences:

    1. cluster.evenly-spread-out-slots could always pick best slots ,and then 
allocate them by the slot-request order , however the request order sometimes 
is not "stable" , hence  it cause a impact on the allocation. Here is the 
reason:

 
{code:java}
class: SlotSharingExecutionSlotAllocator.java

public List allocateSlotsFor(
List executionVertexIds) {
   ..

Map> executionsByGroup =
executionVertexIds.stream()
.collect(
Collectors.groupingBy(

slotSharingStrategy::getExecutionSlotSharingGroup));

Map slots =
 ###
 # The iteration order is uncertain, it could allocate too many 
slot-requests (belongs to same SlotSharingGroup) to the 
 same TaskExecutor. 
 ###
 executionsByGroup.keySet().stream()
.map(group -> getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever))
.collect(
Collectors.toMap(
SharedSlot::getExecutionSlotSharingGroup,
Function.identity()));
   ..
}
{code}
In addition,the option doesn't work for active-deployment job.

 

    2.  My solution try to allocate the slots (belong to the same TaskExecutor) 
to each SlotSharingGroup *in turn.* For example, a job has 12 
ExecutionSlotSharingGroups, and a TaskExecutor is offering 3 slots (call 
JobMaster.offerSlots).

First ,I will group the slot-requests by SlotSharingGroup (not 
ExecutionSlotSharingGroup):

A: 4

B: 4

C: 4

Then allocate a slot to each SlotSharingGroup by turns, it should be more 
evenly than allocated by request-order.

 

 

 

 


was (Author: loyi):
[~trohrmann]   Here is the differences:

    1. cluster.evenly-spread-out-slots could always pick best slots ,and then 
allocate them by the slot-request order , however the request order sometimes 
is not "stable" , hence  it cause a impact on the allocation. Here is the 
reason:

 
{code:java}
class: SlotSharingExecutionSlotAllocator.java

public List allocateSlotsFor(
List executionVertexIds) {
   ..

Map> executionsByGroup =
executionVertexIds.stream()
.collect(
Collectors.groupingBy(

slotSharingStrategy::getExecutionSlotSharingGroup));

Map slots =
 ###
 # The iteration order is uncertain, it could allocate too many 
slot-requests (belongs to same SlotSharingGroup) to the 
 same TaskExecutor. 
 ###
 executionsByGroup.keySet().stream()
.map(group -> getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever))
.collect(
Collectors.toMap(
SharedSlot::getExecutionSlotSharingGroup,
Function.identity()));
   ..
}
{code}
In addition,the option doesn't work for active-deployment job.

 

    2.  My solution try to allocate the slots (belong to the same TaskExecutor) 
to each SlotSharingGroup *in turn.* For example, a job has 12 
ExecutionSlotSharingGroups, and a TaskExecutor is offering 3 slots (call 
JobMaster.offerSlots).

First ,I will group the slot-requests by SlotSharingGroup (not 
ExecutionSlotSharingGroup):

A: 4

 

B: 4

C: 4

Then allocate a slot to each SlotSharingGroup by turns, it should be more 
evenly than allocated by request-order.

 

 

 

 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 

[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-15 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381353#comment-17381353
 ] 

loyi commented on FLINK-23190:
--

[~trohrmann]   Here is the differences:

    1. cluster.evenly-spread-out-slots could always pick best slots ,and then 
allocate them by the slot-request order , however the request order sometimes 
is not "stable" , hence  it cause a impact on the allocation. Here is the 
reason:

 
{code:java}
class: SlotSharingExecutionSlotAllocator.java

public List allocateSlotsFor(
List executionVertexIds) {
   ..

Map> executionsByGroup =
executionVertexIds.stream()
.collect(
Collectors.groupingBy(

slotSharingStrategy::getExecutionSlotSharingGroup));

Map slots =
 ###
 # The iteration order is uncertain, it could allocate too many 
slot-requests (belongs to same SlotSharingGroup) to the 
 same TaskExecutor. 
 ###
 executionsByGroup.keySet().stream()
.map(group -> getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever))
.collect(
Collectors.toMap(
SharedSlot::getExecutionSlotSharingGroup,
Function.identity()));
   ..
}
{code}
In addition,the option doesn't work for active-deployment job.

 

    2.  My solution try to allocate the slots (belong to the same TaskExecutor) 
to each SlotSharingGroup *in turn.* For example, a job has 12 
ExecutionSlotSharingGroups, and a TaskExecutor is offering 3 slots (call 
JobMaster.offerSlots).

First ,I will group the slot-requests by SlotSharingGroup (not 
ExecutionSlotSharingGroup):

A: 4

 

B: 4

C: 4

Then allocate a slot to each SlotSharingGroup by turns, it should be more 
evenly than allocated by request-order.

 

 

 

 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by 

[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-14 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380372#comment-17380372
 ] 

loyi commented on FLINK-23190:
--

Hi, [~trohrmann]  Sorry about my poor englist again, maybe I didn’t understand 
the true intention of your words above.  Could you give me some advice on how 
to continue this process?  

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-12 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379093#comment-17379093
 ] 

loyi edited comment on FLINK-23190 at 7/12/21, 10:03 AM:
-

[~trohrmann]  You are right!
 If a job need 40 phy-slots, and specified ys 3 ,  it will evently apply 14 
TaskExecutors.   What ResourceManager can improve is  applying 12 TaskExecutors 
with 3 slots and 2 TaskExecutors with 2 slots.  If not optimized, it may apply 
13 TaskExecutors with 3 slots and 1 TaskExecutors with 1 slots.  It does cause 
a impact on evenly slot allocation.    But it could be avoid by adjusting the 
parallelism (-ys, etc) of job.  If every task's  parallelism could be divided 
exactly by the amout of TaskExecutors,i think it may be ok for that we just let 
Jobmaster handle this.


was (Author: loyi):
[~trohrmann]  You are right!
If a job need 40 phy-slots, and specified ys 3 ,  it will evently apply 14 
TaskExecutors.   What ResourceManager can improve is  applying 12 TaskExecutors 
with 3 slots and 2 TaskExecutors with 2 slots.  If not optimized, it may apply 
13 TaskExecutors with 3 slots and 1 TaskExecutors with 1 slots.  It does cause 
a impact on evenly slot allocation.    But it could be avoid by adjusting the 
parallelism (-ys, etc) of job, hence the main allocation could be decided by 
Jobmaster.    

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-12 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379093#comment-17379093
 ] 

loyi commented on FLINK-23190:
--

[~trohrmann]  You are right!
If a job need 40 phy-slots, and specified ys 3 ,  it will evently apply 14 
TaskExecutors.   What ResourceManager can improve is  applying 12 TaskExecutors 
with 3 slots and 2 TaskExecutors with 2 slots.  If not optimized, it may apply 
13 TaskExecutors with 3 slots and 1 TaskExecutors with 1 slots.  It does cause 
a impact on evenly slot allocation.    But it could be avoid by adjusting the 
parallelism (-ys, etc) of job, hence the main allocation could be decided by 
Jobmaster.    

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-12 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379046#comment-17379046
 ] 

loyi commented on FLINK-23190:
--

[~trohrmann] hi, maybe we don't need to add a extra strategy for the 
ResourceManager,  as you said above ,  add a different slot selection strategy 
on the Jobmaster is enough for the issues.

In addition,ResourceManager should be just responsible for resource applying ,  
am i understand right?

                                                                                
                                               Looking forward to your reply

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-10 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17378430#comment-17378430
 ] 

loyi commented on FLINK-23190:
--

[~trohrmann]  Do you mean the solution couldn't achieve totally evenly 
allocation due to we don't do the logic in ResourceManager?  

I think JobMaster could make evenly allocation without  telling slot-sharing 
detail to ResourceManager, because the allcoation timing is when ResourceManger 
offering new slots to JobMaster,  the current solution could assign each 
TaskManager' slot as possible as evenly.  I also have test several actual work 
scenes , it all performs well. 
 
 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-09 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377343#comment-17377343
 ] 

loyi edited comment on FLINK-23190 at 7/9/21, 7:45 AM:
---

[~trohrmann]  thanks for your explanation.  I try implement my idea on 
Flink-1.13.1, and it could work well with the default 
*declarative-resource-management*  and *AdaptiveScheduler*.   

 

Here is the idea:
 # When *SlotSharingExecutionSlotAllocator* start a SlotRequestId, we could let 
the request bring the "detail" of ExecutionGroup, and then it will be put in 
*pendingRequests*.
 # Once *NewSlotsListener* triggers, for example provide 4 slots, we could 
calculate the proportion of each group by these *requests* , and assign the 
slot evenly to these groups.
 #  Make *AdaptiveScheduler* allocate free slots to SlotSharingGroup by 
*circle* order over these TaskManagers instead of  *Iteration* order.

 

Considering that we haven't reached consensus yet,i put the concept code on my 
reposity: 

Support DeclarativeResourceManagement:

[https://github.com/saddays/flink/commit/9e3c51664f77aee1b7ff8b68726a6b5eaa630e5b]

 

Support AdaptiveScheduler:

https://github.com/saddays/flink/commit/18e87f794cf2956fa01592e9077d1a5f467ed05a

 

Is this solution feasible?    Looking forward to your reply!


was (Author: loyi):
[~trohrmann]  thanks for your explanation.  I try implement my idea on 
Flink-1.13.1, and it could work well with the default 
*declarative-resource-management*  and *AdaptiveScheduler*.   

 

Here is the idea:
 # When *SlotSharingExecutionSlotAllocator* start a SlotRequestId, we could let 
the request bring the "detail" of ExecutionGroup, and then it will be put in 
*pendingRequests*.
 # Once *NewSlotsListener* triggers, for example provide 4 slots, we could 
calculate the proportion of each group by these *requests* , and assign the 
slot evenly to these groups.
 #  Make *AdaptiveScheduler* allocate free slots to SlotSharingGroup by 
*circle* order over these TaskManagers instead of  *Iteration* order.

 

Considering that we haven't reached consensus yet,i put the concept code on my 
reposity:   
[https://github.com/saddays/flink/commit/9e3c51664f77aee1b7ff8b68726a6b5eaa630e5b]

 

Is this solution feasible?    Looking forward to your reply!

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on 

[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-09 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377343#comment-17377343
 ] 

loyi edited comment on FLINK-23190 at 7/9/21, 7:42 AM:
---

[~trohrmann]  thanks for your explanation.  I try implement my idea on 
Flink-1.13.1, and it could work well with the default 
*declarative-resource-management*  and *AdaptiveScheduler*.   

 

Here is the idea:
 # When *SlotSharingExecutionSlotAllocator* start a SlotRequestId, we could let 
the request bring the "detail" of ExecutionGroup, and then it will be put in 
*pendingRequests*.
 # Once *NewSlotsListener* triggers, for example provide 4 slots, we could 
calculate the proportion of each group by these *requests* , and assign the 
slot evenly to these groups.
 #  Make *AdaptiveScheduler* allocate free slots to SlotSharingGroup by 
*circle* order over these TaskManagers instead of  *Iteration* order.

 

Considering that we haven't reached consensus yet,i put the concept code on my 
reposity:   
[https://github.com/saddays/flink/commit/9e3c51664f77aee1b7ff8b68726a6b5eaa630e5b]

 

Is this solution feasible?    Looking forward to your reply!


was (Author: loyi):
[~trohrmann]  thanks for your explanation.  I try implement my idea on 
Flink-1.13.1, and it could work well with the default 
*declarative-resource-management* feature,but not with AdaptiveScheduler.   

 

Here is the idea:
 # When *SlotSharingExecutionSlotAllocator* start a SlotRequestId, we could let 
the request bring the "detail" of ExecutionGroup, and then it will be put in 
*pendingRequests*.
 # Once *NewSlotsListener* triggers, for example provide 4 slots, we could 
calculate the proportion of each group by these *requests* , and assign the 
slot evenly to these groups.

 

Considering that we haven't reached consensus yet,i put the concept code on my 
reposity:   
https://github.com/saddays/flink/commit/9e3c51664f77aee1b7ff8b68726a6b5eaa630e5b

 

Is this solution feasible?    Looking forward to your reply!

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message 

[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-08 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377343#comment-17377343
 ] 

loyi commented on FLINK-23190:
--

[~trohrmann]  thanks for your explanation.  I try implement my idea on 
Flink-1.13.1, and it could work well with the default 
*declarative-resource-management* feature,but not with AdaptiveScheduler.   

 

Here is the idea:
 # When *SlotSharingExecutionSlotAllocator* start a SlotRequestId, we could let 
the request bring the "detail" of ExecutionGroup, and then it will be put in 
*pendingRequests*.
 # Once *NewSlotsListener* triggers, for example provide 4 slots, we could 
calculate the proportion of each group by these *requests* , and assign the 
slot evenly to these groups.

 

Considering that we haven't reached consensus yet,i put the concept code on my 
reposity:   
https://github.com/saddays/flink/commit/9e3c51664f77aee1b7ff8b68726a6b5eaa630e5b

 

Is this solution feasible?    Looking forward to your reply!

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-06 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375424#comment-17375424
 ] 

loyi edited comment on FLINK-23190 at 7/6/21, 10:23 AM:


[~trohrmann]  ,  it seems the solution just  works for active resource 
management, and i think it may not conflict with AdaptiveScheduler,  we just 
spread out the task according to the *current*  pending*-*slot-requests . 

Also , are the two features you mentioned already available?  Can them solve 
the load-balance issues ?   thx


was (Author: loyi):
[~trohrmann]  ,  seems the solution just  works for active resource management .

Also , are the two features you mentioned already available?  Can them solve 
the load-balance issues ?   thx

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-06 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375432#comment-17375432
 ] 

loyi edited comment on FLINK-23190 at 7/6/21, 10:05 AM:


[~pnowojski] , i have tried the option you mentioned , it seems just works for 
yarn-session or standalone mode.  thx


was (Author: loyi):
[~pnowojski] , i have tried the option you mentioned , it seems just works for 
yarn-session or standalone mode.

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-06 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375424#comment-17375424
 ] 

loyi edited comment on FLINK-23190 at 7/6/21, 10:05 AM:


[~trohrmann]  ,  seems the solution just  works for active resource management .

Also , are the two features you mentioned already available?  Can them solve 
the load-balance issues ?   thx


was (Author: loyi):
[~trohrmann]  ,  seems the solution just  works for action resource management .

Also , are the two features you mentioned already available?  Can them solve 
the load-balance issues ?   

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-06 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375432#comment-17375432
 ] 

loyi commented on FLINK-23190:
--

[~pnowojski] , i have tried the option you mentioned , it seems just works for 
yarn-session or standalone mode.

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-06 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375424#comment-17375424
 ] 

loyi edited comment on FLINK-23190 at 7/6/21, 10:01 AM:


[~trohrmann]  ,  seems the solution just  works for action resource management .

Also , are the two features you mentioned already available?  Can them solve 
the load-balance issues ?   


was (Author: loyi):
Till Rohrmann ,  seems the solution just  works for action resource management .

Also , are the two features you mentioned already available?  Can them solve 
the load-balance issues ?   

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-06 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375424#comment-17375424
 ] 

loyi commented on FLINK-23190:
--

Till Rohrmann ,  seems the solution just  works for action resource management .

Also , are the two features you mentioned already available?  Can them solve 
the load-balance issues ?   

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-02 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23190:
-
Flags: Patch

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-02 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23190:
-
Description: 
FLINK-12122 only guarantees spreading out tasks across the set of TMs which are 
registered at the time of scheduling, but our jobs are all runing on active 
yarn mode, the job with smaller source parallelism offen cause load-balance 
issues. 

 

For this job:
{code:java}
//  -ys 4 means 10 taskmanagers

env.addSource(...).name("A").setParallelism(10).
 map(...).name("B").setParallelism(30)
 .map(...).name("C").setParallelism(40)
 .addSink(...).name("D").setParallelism(20);
{code}
 

 Flink-1.12.3 task allocation: 
||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
|A| 
1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
|B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
|C|4|4|4|4|4|4|4|4|4|4|
|D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|

 

Suggestions:

When TaskManger start register slots to slotManager , current processing logic 
will choose  the first pendingSlot which meet its resource requirements.  The 
"random" strategy usually causes uneven task allocation when source-operator's 
parallelism is significantly below process-operator's.   A simple feasible idea 
 is  {color:#de350b}partition{color} the current  
"{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
AllocationID bring the detail)  , then allocate the slots proportionally to 
each JobVertexGroup.

 

For above case, the 40 pendingSlots could be divided into 4 groups:

[ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}

[BCD]: 10

[CD]: 10

[D]: 10

 

Every taskmanager will provide 4 slots one time, and each group will get 1 slot 
according their proportion (1/4), the final allocation result is below:

[ABCD] : deploye on 10 different taskmangers

[BCD]: deploye on 10 different taskmangers

[CD]: deploye on 10  different taskmangers

[D]: deploye on 10 different taskmangers

 

I have implement a [concept 
code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
  based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
evenly{color} task allocation , and works well on my workload .  Are there 
other point that have not been considered or  does it conflict with future 
plans?      Sorry for my poor english.

 

 

  was:
Description:

FLINK-12122 only guarantees spreading out tasks across the set of TMs which are 
registered at the time of scheduling, but our jobs are all runing on active 
yarn mode, the job with smaller source parallelism offen cause load-balance 
issues. 

 

For this job:
{code:java}
//  -ys 4 means 10 taskmanagers

env.addSource(...).name("A").setParallelism(10).
 map(...).name("B").setParallelism(30)
 .map(...).name("C").setParallelism(40)
 .addSink(...).name("D").setParallelism(20);
{code}
 

 released-1.12.3  allocation: 
||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
|A| 
1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
|B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
|C|4|4|4|4|4|4|4|4|4|4|
|D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|

 

Suggestions:

When TM register slots to slotManager , we could group the pendingRequests by 
their "ExecutionVertexGroup" , then allocate the slots proportionally to each 
group.

 

I have implement a concept version based on release-1.12.3 , the job have fully 
evenly task allocation . I want to know if there are other point that have not 
been considered ?  

 

 

 


> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 

[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly

2021-06-30 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23190:
-
Priority: Major  (was: Minor)

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> Description:
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  released-1.12.3  allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TM register slots to slotManager , we could group the pendingRequests by 
> their "ExecutionVertexGroup" , then allocate the slots proportionally to each 
> group.
>  
> I have implement a concept version based on release-1.12.3 , the job have 
> fully evenly task allocation . I want to know if there are other point that 
> have not been considered ?  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly

2021-06-30 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23190:
-
Description: 
Description:

FLINK-12122 only guarantees spreading out tasks across the set of TMs which are 
registered at the time of scheduling, but our jobs are all runing on active 
yarn mode, the job with smaller source parallelism offen cause load-balance 
issues. 

 

For this job:
{code:java}
//  -ys 4 means 10 taskmanagers

env.addSource(...).name("A").setParallelism(10).
 map(...).name("B").setParallelism(30)
 .map(...).name("C").setParallelism(40)
 .addSink(...).name("D").setParallelism(20);
{code}
 

 released-1.12.3  allocation: 
||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
|A| 
1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
|B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
|C|4|4|4|4|4|4|4|4|4|4|
|D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|

 

Suggestions:

When TM register slots to slotManager , we could group the pendingRequests by 
their "ExecutionVertexGroup" , then allocate the slots proportionally to each 
group.

 

I have implement a concept version based on release-1.12.3 , the job have fully 
evenly task allocation . I want to know if there are other point that have not 
been considered ?  

 

 

 

  was:
Description:

FLINK-12122 only guarantees spreading out tasks across the set of TMs which are 
registered at the time of scheduling, but our jobs are all runing on active 
yarn mode, the job with smaller source parallelism offen cause load-balance 
issues. 

For this job:

 

 
{code:java}
//  -ys 4 means 10 taskmanagers

env.addSource(...).name("A").setParallelism(10).
 map(...).name("B").setParallelism(30)
 .map(...).name("C").setParallelism(40)
 .addSink(...).name("D").setParallelism(20);
{code}
 

 released-1.12.3  allocation:

 

 
||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
|A| 
1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
|B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
|C|4|4|4|4|4|4|4|4|4|4|
|D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|

 

Suggestions:

When TM register slots to slotManager , we could group the pendingRequests by 
their "ExecutionVertexGroup" , then allocate the slots proportionally to each 
group.

 

I have implement a concept version based on release-1.12.3 , the job have fully 
evenly task allocation . I want to know if there are other point that have not 
been considered ?  

 

 

 


> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Minor
>
> Description:
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  released-1.12.3  allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TM register slots to slotManager , we could group the pendingRequests by 
> their "ExecutionVertexGroup" , then allocate the slots proportionally to each 
> group.
>  
> I have implement a concept version based on release-1.12.3 , the job have 
> fully evenly task allocation . I want to know if there are other point that 
> have not been considered ?  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23190) Make task-slot allocation much more evenly

2021-06-29 Thread loyi (Jira)
loyi created FLINK-23190:


 Summary: Make task-slot allocation much more evenly
 Key: FLINK-23190
 URL: https://issues.apache.org/jira/browse/FLINK-23190
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.12.3
Reporter: loyi


Description:

FLINK-12122 only guarantees spreading out tasks across the set of TMs which are 
registered at the time of scheduling, but our jobs are all runing on active 
yarn mode, the job with smaller source parallelism offen cause load-balance 
issues. 

For this job:

 

 
{code:java}
//  -ys 4 means 10 taskmanagers

env.addSource(...).name("A").setParallelism(10).
 map(...).name("B").setParallelism(30)
 .map(...).name("C").setParallelism(40)
 .addSink(...).name("D").setParallelism(20);
{code}
 

 released-1.12.3  allocation:

 

 
||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
|A| 
1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
|B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
|C|4|4|4|4|4|4|4|4|4|4|
|D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|

 

Suggestions:

When TM register slots to slotManager , we could group the pendingRequests by 
their "ExecutionVertexGroup" , then allocate the slots proportionally to each 
group.

 

I have implement a concept version based on release-1.12.3 , the job have fully 
evenly task allocation . I want to know if there are other point that have not 
been considered ?  

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)