[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738851#comment-17738851 ] loyi commented on FLINK-23190: -- [~heigebupahei] I implemented it in flink1.13.2, not sure whether it is related to version changes. I will try to find out the reason,not guaranteed. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.3, 1.13.1 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available, stale-assigned > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737930#comment-17737930 ] loyi commented on FLINK-23190: -- [~heigebupahei] Thanks for your trying. Have you opened the config *jobmanager.scheduler.enable-slot-allocate-order-optimization* which was added for the patch? If you did so yet, could you provide the example code? I will try to see why it did not work. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.3, 1.13.1 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available, stale-assigned > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737413#comment-17737413 ] loyi commented on FLINK-23190: -- [~heigebupahei] Looking forward to your feedback. :) > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.3, 1.13.1 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available, stale-assigned > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503952#comment-17503952 ] loyi commented on FLINK-23190: -- Thank you both for your kind replies. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.3, 1.13.1 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available, stale-assigned > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23190: - Flags: (was: Patch) > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.3, 1.13.1 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available, stale-assigned > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17502196#comment-17502196 ] loyi commented on FLINK-23190: -- Hi [~trohrmann] , I am sorry to disturb you. Is the feature still be considered? Or the community has another plan/solutions ? Please give me some feedbacks. Thanks. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.3, 1.13.1 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available, stale-assigned > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23190: - Affects Version/s: 1.13.1 1.14.0 > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.3, 1.13.1 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available, stale-assigned > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17440937#comment-17440937 ] loyi commented on FLINK-23190: -- Hi, [~trohrmann] , it seems to be a few months since we last talked about this issue, Could I ask you how things are going? > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available, stale-assigned > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407022#comment-17407022 ] loyi commented on FLINK-23190: -- Get it . Thanks. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406616#comment-17406616 ] loyi commented on FLINK-23190: -- I have submitted a [PR|https://github.com/apache/flink/pull/16929] for this feature, do you have time to take a look? Thanks. [~trohrmann] > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23848) PulsarSourceITCase is failed on Azure
[ https://issues.apache.org/jira/browse/FLINK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402968#comment-17402968 ] loyi commented on FLINK-23848: -- I met a similar issue. [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22612=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=25015] > PulsarSourceITCase is failed on Azure > - > > Key: FLINK-23848 > URL: https://issues.apache.org/jira/browse/FLINK-23848 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.14.0 >Reporter: Jark Wu >Assignee: Yufan Sheng >Priority: Blocker > Labels: test-stability > Fix For: 1.14.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22412=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461 > {code} > 2021-08-17T20:11:53.7228789Z Aug 17 20:11:53 [INFO] Running > org.apache.flink.connector.pulsar.source.PulsarSourceITCase > 2021-08-17T20:17:38.2429467Z Aug 17 20:17:38 [ERROR] Tests run: 8, Failures: > 0, Errors: 1, Skipped: 0, Time elapsed: 344.515 s <<< FAILURE! - in > org.apache.flink.connector.pulsar.source.PulsarSourceITCase > 2021-08-17T20:17:38.2430693Z Aug 17 20:17:38 [ERROR] > testMultipleSplits{TestEnvironment, ExternalContext}[2] Time elapsed: 66.766 > s <<< ERROR! > 2021-08-17T20:17:38.2431387Z Aug 17 20:17:38 java.lang.RuntimeException: > Failed to fetch next result > 2021-08-17T20:17:38.2432035Z Aug 17 20:17:38 at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) > 2021-08-17T20:17:38.2433345Z Aug 17 20:17:38 at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > 2021-08-17T20:17:38.2434175Z Aug 17 20:17:38 at > org.apache.flink.connectors.test.common.utils.TestDataMatchers$MultipleSplitDataMatcher.matchesSafely(TestDataMatchers.java:151) > 2021-08-17T20:17:38.2435028Z Aug 17 20:17:38 at > org.apache.flink.connectors.test.common.utils.TestDataMatchers$MultipleSplitDataMatcher.matchesSafely(TestDataMatchers.java:133) > 2021-08-17T20:17:38.2438387Z Aug 17 20:17:38 at > org.hamcrest.TypeSafeDiagnosingMatcher.matches(TypeSafeDiagnosingMatcher.java:55) > 2021-08-17T20:17:38.2439100Z Aug 17 20:17:38 at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:12) > 2021-08-17T20:17:38.2439708Z Aug 17 20:17:38 at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) > 2021-08-17T20:17:38.2440299Z Aug 17 20:17:38 at > org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase.testMultipleSplits(SourceTestSuiteBase.java:156) > 2021-08-17T20:17:38.2441007Z Aug 17 20:17:38 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2021-08-17T20:17:38.2441526Z Aug 17 20:17:38 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2021-08-17T20:17:38.2442068Z Aug 17 20:17:38 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2021-08-17T20:17:38.2442759Z Aug 17 20:17:38 at > java.lang.reflect.Method.invoke(Method.java:498) > 2021-08-17T20:17:38.2443247Z Aug 17 20:17:38 at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > 2021-08-17T20:17:38.2443812Z Aug 17 20:17:38 at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > 2021-08-17T20:17:38.241Z Aug 17 20:17:38 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > 2021-08-17T20:17:38.2445101Z Aug 17 20:17:38 at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > 2021-08-17T20:17:38.2445688Z Aug 17 20:17:38 at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > 2021-08-17T20:17:38.2446328Z Aug 17 20:17:38 at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92) > 2021-08-17T20:17:38.2447303Z Aug 17 20:17:38 at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > 2021-08-17T20:17:38.2448336Z Aug 17 20:17:38 at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > 2021-08-17T20:17:38.2448999Z Aug 17 20:17:38 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > 2021-08-17T20:17:38.2449689Z Aug 17 20:17:38 at >
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401642#comment-17401642 ] loyi commented on FLINK-23190: -- [~trohrmann] I'm willing to contribute this feature. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > Labels: pull-request-available > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17397921#comment-17397921 ] loyi edited comment on FLINK-23190 at 8/12/21, 8:21 AM: Thanks for your replying [~trohrmann] :) I aggre with you that group details shouldn't be put in the SlotRequestId, so i find another solution, below is the idea: As we know, the DefaultScheduler offers slot for PendingRequest in request order (beacuse PendingRequest is stored in a LinkedHashMap), so we can control the ExecutionSlotSharingGroup slot-request order to achieve same goal , and it only involves ExecutionSlotAllocator's implemention. Here is the concept code: {code:java} @Override public List allocateSlotsFor( List executionVertexIds) { // bla bla Map> executionsByGroup = executionVertexIds.stream() .collect( Collectors.groupingBy( slotSharingStrategy::getExecutionSlotSharingGroup)); Map slots = // Here we can control the ExecutionSlotSharingGroup allocation order instead of random order. // For example, a circle order of JobVertexGroup sortExecutionSlotSharingGroup(executionsByGroup.keySet()) .stream().map(group -> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) .collect( Collectors.toMap( SharedSlot::getExecutionSlotSharingGroup, Function.identity())); // bla bla } private Collection sortExecutionSlotSharingGroup( Collection executionVertexGroups) { Map, List> jobVertexGroups = executionVertexGroups.stream() .collect(Collectors.groupingBy(this::getJobVertexSharingGroup)); List> groups = jobVertexGroups.entrySet() .stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), g1.getKey().size())) .map(Map.Entry::getValue).collect(Collectors.toList()); List sorted = new ArrayList<>(); int i = executionVertexGroups.size(), j = 0; while (i > 0) { List group = groups.get((j++) % groups.size()); if (group.isEmpty()) { continue; } i--; sorted.add(group.remove(0)); } return sorted; } Set getJobVertexSharingGroup(ExecutionSlotSharingGroup group) { return group.getExecutionVertexIds().stream() .map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet()); } {code} After my test, it has a same distribution result as old solution. Do you think it is feasible? was (Author: loyi): Thanks for your replying [~trohrmann] :) For DefaultScheduler : I aggre with you that group details shouldn't be put in the SlotRequestId, so i find another solution, below is the idea: DefaultScheduler offers slot for PendingRequest in request order (beacuse PendingRequest is stored in a LinkedHashMap), so we can control the ExecutionSlotSharingGroup slot-request order to achieve same goal , and it only involves ExecutionSlotAllocator's implemention. Here is the concept code: {code:java} @Override public List allocateSlotsFor( List executionVertexIds) { // bla bla Map> executionsByGroup = executionVertexIds.stream() .collect( Collectors.groupingBy( slotSharingStrategy::getExecutionSlotSharingGroup)); Map slots = // Here we can control the ExecutionSlotSharingGroup allocation order instead of random order. // For example, a circle order of JobVertexGroup sortExecutionSlotSharingGroup(executionsByGroup.keySet()) .stream().map(group -> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) .collect( Collectors.toMap( SharedSlot::getExecutionSlotSharingGroup, Function.identity())); // bla bla } private Collection sortExecutionSlotSharingGroup( Collection executionVertexGroups) { Map, List> jobVertexGroups = executionVertexGroups.stream() .collect(Collectors.groupingBy(this::getJobVertexSharingGroup)); List> groups = jobVertexGroups.entrySet() .stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), g1.getKey().size())) .map(Map.Entry::getValue).collect(Collectors.toList()); List sorted = new ArrayList<>(); int i = executionVertexGroups.size(), j = 0; while (i > 0) { List group = groups.get((j++) % groups.size()); if (group.isEmpty()) { continue; } i--; sorted.add(group.remove(0)); } return sorted; }
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17397921#comment-17397921 ] loyi commented on FLINK-23190: -- Thanks for your replying [~trohrmann] :) For DefaultScheduler : I aggre with you that group details shouldn't be put in the SlotRequestId, so i find another solution, below is the idea: DefaultScheduler offers slot for PendingRequest in request order (beacuse PendingRequest is stored in a LinkedHashMap), so we can control the ExecutionSlotSharingGroup slot-request order to achieve same goal , and it only involves ExecutionSlotAllocator's implemention. Here is the concept code: {code:java} @Override public List allocateSlotsFor( List executionVertexIds) { // bla bla Map> executionsByGroup = executionVertexIds.stream() .collect( Collectors.groupingBy( slotSharingStrategy::getExecutionSlotSharingGroup)); Map slots = // Here we can control the ExecutionSlotSharingGroup allocation order instead of random order. // For example, a circle order of JobVertexGroup sortExecutionSlotSharingGroup(executionsByGroup.keySet()) .stream().map(group -> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) .collect( Collectors.toMap( SharedSlot::getExecutionSlotSharingGroup, Function.identity())); // bla bla } private Collection sortExecutionSlotSharingGroup( Collection executionVertexGroups) { Map, List> jobVertexGroups = executionVertexGroups.stream() .collect(Collectors.groupingBy(this::getJobVertexSharingGroup)); List> groups = jobVertexGroups.entrySet() .stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), g1.getKey().size())) .map(Map.Entry::getValue).collect(Collectors.toList()); List sorted = new ArrayList<>(); int i = executionVertexGroups.size(), j = 0; while (i > 0) { List group = groups.get((j++) % groups.size()); if (group.isEmpty()) { continue; } i--; sorted.add(group.remove(0)); } return sorted; } Set getJobVertexSharingGroup(ExecutionSlotSharingGroup group) { return group.getExecutionVertexIds().stream() .map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet()); } {code} After my test, it has a same distribution result as old solution. Do you think it is feasible? > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > Labels: pull-request-available > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > >
[jira] [Commented] (FLINK-23595) Allow JSON format deserialize non-numeric numbers
[ https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396369#comment-17396369 ] loyi commented on FLINK-23595: -- [~jark] Hi, i would like to contribute this feature, can you assign this ticket to me ? > Allow JSON format deserialize non-numeric numbers > - > > Key: FLINK-23595 > URL: https://issues.apache.org/jira/browse/FLINK-23595 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.13.1 >Reporter: loyi >Priority: Major > > I am migirating old flink-stream system to flink-sql , however it occurs an > Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers. > > {noformat} > Exception stack: > Caused by: java.io.IOException: Failed to deserialize JSON > '{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'. > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149) > ~[?:?] > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81) > ~[?:?] > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) > ~[?:?] > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) > ~[?:?] > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > ~[?:?] > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > ~[?:?] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > Caused by: > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: > Non-standard token 'NaN': enable > JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow > at [Source: UNKNOWN; line: 1, column: 310] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at >
[jira] [Commented] (FLINK-23595) Allow JSON format deserialize non-numeric numbers
[ https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395958#comment-17395958 ] loyi commented on FLINK-23595: -- hi [~jark], can this feature be supported? > Allow JSON format deserialize non-numeric numbers > - > > Key: FLINK-23595 > URL: https://issues.apache.org/jira/browse/FLINK-23595 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.13.1 >Reporter: loyi >Priority: Major > > I am migirating old flink-stream system to flink-sql , however it occurs an > Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers. > > {noformat} > Exception stack: > Caused by: java.io.IOException: Failed to deserialize JSON > '{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'. > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149) > ~[?:?] > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81) > ~[?:?] > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) > ~[?:?] > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) > ~[?:?] > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > ~[?:?] > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > ~[?:?] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > Caused by: > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: > Non-standard token 'NaN': enable > JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow > at [Source: UNKNOWN; line: 1, column: 310] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at >
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395691#comment-17395691 ] loyi commented on FLINK-23190: -- [~trohrmann] hi, do you have a conclusion about this ? > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > Labels: pull-request-available > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.
[ https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394441#comment-17394441 ] loyi commented on FLINK-23645: -- [~jark] yes ,please assign to me > Sqlclient doesn't response CTRL-C correctly before Executor ResultView > opening. > --- > > Key: FLINK-23645 > URL: https://issues.apache.org/jira/browse/FLINK-23645 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.1 >Reporter: loyi >Priority: Minor > Attachments: image-2021-08-05-18-31-50-487.png, > image-2021-08-05-18-44-08-453.png > > > If we press ctrl-c before the *ResultView* opened,the terminal will exits > without any prompt. > For example: > !image-2021-08-05-18-31-50-487.png! > After analsis, i found out the reason is Sqlclient doesn't register > *Signal.INT* handler before *callOperation*, then jvm default handler > executes and exit. > > Suggestion: > We could just interupt the Executor when we receive *Signal.INT* > > {code:java} > private void callOperation(Operation operation, ExecutionMode mode) { > validate(operation, mode); > final Thread thread = Thread.currentThread(); > final Terminal.SignalHandler previousHandler = > terminal.handle(Terminal.Signal.INT, (signal) -> > thread.interrupt()); > try { > if (operation instanceof QuitOperation) { > // QUIT/EXIT > callQuit(); > } else if (operation instanceof ClearOperation) { > // CLEAR > callClear(); > } else if (operation instanceof HelpOperation) { > // HELP > callHelp(); > } else if (operation instanceof SetOperation) { > // SET > callSet((SetOperation) operation); > } else if (operation instanceof ResetOperation) { > // RESET > callReset((ResetOperation) operation); > } else if (operation instanceof CatalogSinkModifyOperation) { > // INSERT INTO/OVERWRITE > callInsert((CatalogSinkModifyOperation) operation); > } else if (operation instanceof QueryOperation) { > // SELECT > callSelect((QueryOperation) operation); > } else if (operation instanceof ExplainOperation) { > // EXPLAIN > callExplain((ExplainOperation) operation); > } else if (operation instanceof BeginStatementSetOperation) { > // BEGIN STATEMENT SET > callBeginStatementSet(); > } else if (operation instanceof EndStatementSetOperation) { > // END > callEndStatementSet(); > } else { > // fallback to default implementation > executeOperation(operation); > } > } finally { > terminal.handle(Terminal.Signal.INT, previousHandler); >} > }{code} > > After fixed: > !image-2021-08-05-18-44-08-453.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.
[ https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23645: - Description: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: !image-2021-08-05-18-31-50-487.png! After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} After fixed: !image-2021-08-05-18-44-08-453.png! was: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: !image-2021-08-05-18-31-50-487.png! After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} ** > Sqlclient doesn't response CTRL-C correctly before Executor ResultView > opening. > --- > > Key: FLINK-23645 > URL: https://issues.apache.org/jira/browse/FLINK-23645 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >
[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.
[ https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23645: - Attachment: image-2021-08-05-18-44-08-453.png > Sqlclient doesn't response CTRL-C correctly before Executor ResultView > opening. > --- > > Key: FLINK-23645 > URL: https://issues.apache.org/jira/browse/FLINK-23645 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.1 >Reporter: loyi >Priority: Minor > Attachments: image-2021-08-05-18-31-50-487.png, > image-2021-08-05-18-44-08-453.png > > > If we press ctrl-c before the *ResultView* opened,the terminal will exits > without any prompt. > For example: > !image-2021-08-05-18-31-50-487.png! > After analsis, i found out the reason is Sqlclient doesn't register > *Signal.INT* handler before *callOperation*, then jvm default handler > executes and exit. > > Suggestion: > We could just interupt the Executor when we receive *Signal.INT* > > {code:java} > private void callOperation(Operation operation, ExecutionMode mode) { > validate(operation, mode); > final Thread thread = Thread.currentThread(); > final Terminal.SignalHandler previousHandler = > terminal.handle(Terminal.Signal.INT, (signal) -> > thread.interrupt()); > try { > if (operation instanceof QuitOperation) { > // QUIT/EXIT > callQuit(); > } else if (operation instanceof ClearOperation) { > // CLEAR > callClear(); > } else if (operation instanceof HelpOperation) { > // HELP > callHelp(); > } else if (operation instanceof SetOperation) { > // SET > callSet((SetOperation) operation); > } else if (operation instanceof ResetOperation) { > // RESET > callReset((ResetOperation) operation); > } else if (operation instanceof CatalogSinkModifyOperation) { > // INSERT INTO/OVERWRITE > callInsert((CatalogSinkModifyOperation) operation); > } else if (operation instanceof QueryOperation) { > // SELECT > callSelect((QueryOperation) operation); > } else if (operation instanceof ExplainOperation) { > // EXPLAIN > callExplain((ExplainOperation) operation); > } else if (operation instanceof BeginStatementSetOperation) { > // BEGIN STATEMENT SET > callBeginStatementSet(); > } else if (operation instanceof EndStatementSetOperation) { > // END > callEndStatementSet(); > } else { > // fallback to default implementation > executeOperation(operation); > } > } finally { > terminal.handle(Terminal.Signal.INT, previousHandler); >} > }{code} > ** > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23595) Allow JSON format deserialize non-numeric numbers
[ https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23595: - Description: I am migirating old flink-stream system to flink-sql , however it occurs an Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers. {noformat} Exception stack: Caused by: java.io.IOException: Failed to deserialize JSON '{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149) ~[?:?] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81) ~[?:?] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) ~[?:?] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) ~[?:?] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[?:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) ~[?:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1] Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow at [Source: UNKNOWN; line: 1, column: 310] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142) ~[?:?] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81) ~[?:?] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
[jira] [Updated] (FLINK-23595) Allow JSON format deserialize non-numeric numbers
[ https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23595: - Environment: (was: {noformat} {noformat}) > Allow JSON format deserialize non-numeric numbers > - > > Key: FLINK-23595 > URL: https://issues.apache.org/jira/browse/FLINK-23595 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.13.1 >Reporter: loyi >Priority: Major > > I am migirating old flink-stream system to flink-sql , however it occurs an > Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers. > > Suggestion: > I notice that *JsonRowDataDeserializationSchema* using ** jackson ** as > default implemention, and jackson doesn't enable parsing non-numeric number > by default. For backward compatibility, we could add option > `allow-non-numeric-numbers` to enable this feature. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23595) Allow JSON format deserialize non-numeric numbers
[ https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23595: - Description: I am migirating old flink-stream system to flink-sql , however it occurs an Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers. {noformat} Exception stack: Caused by: java.io.IOException: Failed to deserialize JSON '{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149) ~[?:?] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81) ~[?:?] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) ~[?:?] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) ~[?:?] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[?:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) ~[?:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1] Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow at [Source: UNKNOWN; line: 1, column: 310] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142) ~[?:?] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81) ~[?:?] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
[jira] [Updated] (FLINK-23595) Allow JSON format deserialize non-numeric numbers
[ https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23595: - Environment: {noformat} {noformat} was: {noformat} Exception stack: Caused by: java.io.IOException: Failed to deserialize JSON '{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149) ~[?:?] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81) ~[?:?] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) ~[?:?] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) ~[?:?] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[?:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) ~[?:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1] Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow at [Source: UNKNOWN; line: 1, column: 310] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142) ~[?:?] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81) ~[?:?] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) ~[?:?] at
[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.
[ https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23645: - Description: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: !image-2021-08-05-18-31-50-487.png! After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} ** was: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: {noformat} Flink SQL> select 1; ^C [bot@ bin]${noformat} After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} ** > Sqlclient doesn't response CTRL-C correctly before Executor ResultView > opening. > --- > > Key: FLINK-23645 > URL: https://issues.apache.org/jira/browse/FLINK-23645 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.1 >
[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.
[ https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23645: - Description: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: {noformat} Flink SQL> select 1; ^C [bot@ bin]${noformat} After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} ** was: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: {noformat} Flink SQL> select 1; ^C [bot@ bin]${noformat}{noformat} After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} ** > Sqlclient doesn't response CTRL-C correctly before Executor ResultView > opening. > --- > > Key: FLINK-23645 > URL: https://issues.apache.org/jira/browse/FLINK-23645 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >
[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.
[ https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23645: - Attachment: image-2021-08-05-18-31-50-487.png > Sqlclient doesn't response CTRL-C correctly before Executor ResultView > opening. > --- > > Key: FLINK-23645 > URL: https://issues.apache.org/jira/browse/FLINK-23645 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.1 >Reporter: loyi >Priority: Minor > Attachments: image-2021-08-05-18-31-50-487.png > > > If we press ctrl-c before the *ResultView* opened,the terminal will exits > without any prompt. > For example: > {noformat} > Flink SQL> select 1; > ^C > [bot@ bin]${noformat} > After analsis, i found out the reason is Sqlclient doesn't register > *Signal.INT* handler before *callOperation*, then jvm default handler > executes and exit. > > Suggestion: > We could just interupt the Executor when we receive *Signal.INT* > > {code:java} > private void callOperation(Operation operation, ExecutionMode mode) { > validate(operation, mode); > final Thread thread = Thread.currentThread(); > final Terminal.SignalHandler previousHandler = > terminal.handle(Terminal.Signal.INT, (signal) -> > thread.interrupt()); > try { > if (operation instanceof QuitOperation) { > // QUIT/EXIT > callQuit(); > } else if (operation instanceof ClearOperation) { > // CLEAR > callClear(); > } else if (operation instanceof HelpOperation) { > // HELP > callHelp(); > } else if (operation instanceof SetOperation) { > // SET > callSet((SetOperation) operation); > } else if (operation instanceof ResetOperation) { > // RESET > callReset((ResetOperation) operation); > } else if (operation instanceof CatalogSinkModifyOperation) { > // INSERT INTO/OVERWRITE > callInsert((CatalogSinkModifyOperation) operation); > } else if (operation instanceof QueryOperation) { > // SELECT > callSelect((QueryOperation) operation); > } else if (operation instanceof ExplainOperation) { > // EXPLAIN > callExplain((ExplainOperation) operation); > } else if (operation instanceof BeginStatementSetOperation) { > // BEGIN STATEMENT SET > callBeginStatementSet(); > } else if (operation instanceof EndStatementSetOperation) { > // END > callEndStatementSet(); > } else { > // fallback to default implementation > executeOperation(operation); > } > } finally { > terminal.handle(Terminal.Signal.INT, previousHandler); >} > }{code} > ** > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.
[ https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23645: - Description: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: {noformat} Flink SQL> select 1; ^C [bot@ bin]${noformat}{noformat} After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} ** was: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: {noformat} Flink SQL> select 1; ^C [bot@ bin]${noformat} After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} ** > Sqlclient doesn't response CTRL-C correctly before Executor ResultView > opening. > --- > > Key: FLINK-23645 > URL: https://issues.apache.org/jira/browse/FLINK-23645 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >
[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.
[ https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23645: - Description: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: {noformat} Flink SQL> select 1; ^C [bot@ bin]${noformat}{noformat} After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} ** was: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: {noformat} Flink SQL> select 1; ^C [bot@ bin]${noformat}{noformat} After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} ** > Sqlclient doesn't response CTRL-C correctly before Executor ResultView > opening. > --- > > Key: FLINK-23645 > URL: https://issues.apache.org/jira/browse/FLINK-23645 > Project: Flink > Issue Type: Bug > Components: Table SQL /
[jira] [Updated] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.
[ https://issues.apache.org/jira/browse/FLINK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23645: - Description: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. For example: {noformat} Flink SQL> select 1; ^C [bot@ bin]${noformat} After analsis, i found out the reason is Sqlclient doesn't register *Signal.INT* handler before *callOperation*, then jvm default handler executes and exit. Suggestion: We could just interupt the Executor when we receive *Signal.INT* {code:java} private void callOperation(Operation operation, ExecutionMode mode) { validate(operation, mode); final Thread thread = Thread.currentThread(); final Terminal.SignalHandler previousHandler = terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt()); try { if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); } else if (operation instanceof ClearOperation) { // CLEAR callClear(); } else if (operation instanceof HelpOperation) { // HELP callHelp(); } else if (operation instanceof SetOperation) { // SET callSet((SetOperation) operation); } else if (operation instanceof ResetOperation) { // RESET callReset((ResetOperation) operation); } else if (operation instanceof CatalogSinkModifyOperation) { // INSERT INTO/OVERWRITE callInsert((CatalogSinkModifyOperation) operation); } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); } else if (operation instanceof ExplainOperation) { // EXPLAIN callExplain((ExplainOperation) operation); } else if (operation instanceof BeginStatementSetOperation) { // BEGIN STATEMENT SET callBeginStatementSet(); } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); } else { // fallback to default implementation executeOperation(operation); } } finally { terminal.handle(Terminal.Signal.INT, previousHandler); } }{code} ** was: If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. {noformat} Flink SQL> select 1; ^C [bot@ bin]${noformat} > Sqlclient doesn't response CTRL-C correctly before Executor ResultView > opening. > --- > > Key: FLINK-23645 > URL: https://issues.apache.org/jira/browse/FLINK-23645 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.1 >Reporter: loyi >Priority: Minor > > If we press ctrl-c before the *ResultView* opened,the terminal will exits > without any prompt. > For example: > {noformat} > Flink SQL> select 1; > ^C > [bot@ bin]${noformat} > After analsis, i found out the reason is Sqlclient doesn't register > *Signal.INT* handler before *callOperation*, then jvm default handler > executes and exit. > > Suggestion: > We could just interupt the Executor when we receive *Signal.INT* > > {code:java} > private void callOperation(Operation operation, ExecutionMode mode) { > validate(operation, mode); > final Thread thread = Thread.currentThread(); > final Terminal.SignalHandler previousHandler = > terminal.handle(Terminal.Signal.INT, (signal) -> > thread.interrupt()); > try { > if (operation instanceof QuitOperation) { > // QUIT/EXIT > callQuit(); > } else if (operation instanceof ClearOperation) { > // CLEAR > callClear(); > } else if (operation instanceof HelpOperation) { > // HELP > callHelp(); > } else if (operation instanceof SetOperation) { > // SET > callSet((SetOperation) operation); > } else if (operation instanceof ResetOperation) { > // RESET > callReset((ResetOperation) operation); > } else if (operation instanceof CatalogSinkModifyOperation) { > // INSERT INTO/OVERWRITE > callInsert((CatalogSinkModifyOperation) operation); > } else if (operation instanceof QueryOperation) { > // SELECT > callSelect((QueryOperation) operation); > } else if (operation instanceof ExplainOperation) { > // EXPLAIN > callExplain((ExplainOperation) operation); > } else if (operation instanceof BeginStatementSetOperation) { > // BEGIN STATEMENT SET > callBeginStatementSet(); > }
[jira] [Created] (FLINK-23645) Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening.
loyi created FLINK-23645: Summary: Sqlclient doesn't response CTRL-C correctly before Executor ResultView opening. Key: FLINK-23645 URL: https://issues.apache.org/jira/browse/FLINK-23645 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.13.1 Reporter: loyi If we press ctrl-c before the *ResultView* opened,the terminal will exits without any prompt. {noformat} Flink SQL> select 1; ^C [bot@ bin]${noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23595) Allow JSON format deserialize non-numeric numbers
loyi created FLINK-23595: Summary: Allow JSON format deserialize non-numeric numbers Key: FLINK-23595 URL: https://issues.apache.org/jira/browse/FLINK-23595 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.13.1 Environment: {noformat} Exception stack: Caused by: java.io.IOException: Failed to deserialize JSON '{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149) ~[?:?] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81) ~[?:?] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) ~[?:?] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) ~[?:?] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[?:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) ~[?:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1] Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow at [Source: UNKNOWN; line: 1, column: 310] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142) ~[?:?] at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81) ~[?:?] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17384608#comment-17384608 ] loyi commented on FLINK-23190: -- [~trohrmann] Hello, should i provide more results? Can these prove that the solution is feasible? Looking forward to your reply. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381794#comment-17381794 ] loyi edited comment on FLINK-23190 at 7/16/21, 6:06 AM: [~trohrmann] Thanks for your patience. I have implement a version based on flink-1.13.1, and then I will use two real examples for comparison below. h2. Example 1: This job could make completely evenly allocation in theory. {code:java} // job description StreamExecutionEnvironment env; env.addSource(...).name("a").setParallelism(10). map(...).name("b").setParallelism(30) .map(...).name("c").setParallelism(40) .addSink(...).name("d").setParallelism(20); //We specify -ys 4 , that means the job will apply for 10 TaskExecutors. flink run -yjm 2048 -ytm 2048 -ys 4 -m yarn-cluster -c com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar {code} !image-2021-07-16-10-34-30-700.png|width=1042,height=219! *flink-1.13.1-official*(default config) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10|| |a|0|1|2|0|0|1|2|2|2|0| |b|4|1|3|3|4|4|3|3|3|2| |c|4|4|4|4|4|4|4|4|4|4| |d|3|1|3|1|4|2|2|2|0|2| *flink-1.13.1-official*(with cluster.evenly-spread-out-slots: true) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10|| |a|1|1|0|2|0|3|1|0|0|2| |b|4|1|3|4|4|3|4|2|3|2| |c|4|4|4|4|4|4|4|4|4|4| |d|3|1|3|2|3|2|2|1|2|1| *flink-1.13.1-patch*(default config) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10|| |a|1|1|1|1|1|1|1|1|1|1| |b|3|3|3|3|3|3|3|3|3|3| |c|4|4|4|4|4|4|4|4|4|4| |d|2|2|2|2|2|2|2|2|2|2| *flink-1.13.1-patch*(with jobmanager.scheduler: adaptive) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10|| |a|1|1|1|1|1|1|1|1|1|1| |b|3|3|3|3|3|3|3|3|3|3| |c|4|4|4|4|4|4|4|4|4|4| |d|2|2|2|2|2|2|2|2|2|2| h2. Example 2: We will construct a job that couldn't acheive absolutely evenly allocation. {code:java} //job description is the same with above example. StreamExecutionEnvironment env; env.addSource(...).name("a").setParallelism(10) .map(...).name("b").setParallelism(30) .map(...).name("c").setParallelism(40) .addSink(...).name("d").setParallelism(20); //We specify -ys 3 , that means the job will apply for 14 TaskExecutors. flink run -yjm 2048 -ytm 2048 -ys 3 -m yarn-cluster -c com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar {code} *flink-1.13.1-official*(default config) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14|| |a|3|1|0|3|1|1|1|0|0|0|0|0|0|0| |b|3|3|3|3|2|2|2|1|3|2|3|2|1|0| |c|3|3|3|3|3|3|3|3|3|1|3|3|3|3| |d|2|2|1|2|3|1|1|1|1|1|2|2|1|0| *flink-1.13.1-official*(with cluster.evenly-spread-out-slots: true) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14|| |a|2|1|1|1|2|2|1|0|0|0|0|0|0|0| |b|3|3|2|1|1|3|3|2|1|3|2|3|3|0| |c|3|3|3|1|3|3|3|3|3|3|3|3|3|3| |d|1|2|2|1|2|3|1|2|2|3|1|0|0|0| *flink-1.13.1-patch*(default config) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14|| |a|1|1|1|1|1|0|1|1|0|1|0|1|0|1| |b|3|2|2|2|2|2|3|2|2|2|2|2|2|2| |c|3|3|3|3|3|3|3|3|3|3|3|3|1|3| |d|2|1|2|1|1|1|2|2|1|2|1|2|2|0| *flink-1.13.1-patch*(with jobmanager.scheduler: adaptive) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14|| |a|1|0|0|1|1|1|1|0|0|1|1|1|1|1| |b|2|2|2|2|3|3|3|2|1|2|2|2|2|2| |c|3|3|3|3|3|3|3|3|1|3|3|3|3|3| |d|1|1|1|1|2|2|2|1|1|2|1|2|1|2| We can see that the patch has a better allocation effect, and spread out each operator across all TaskExecutors as possibly as evenly as it can. was (Author: loyi): [~trohrmann] Thanks for your patience. I have implement a version based on flink-1.13.1, and then I will use two real examples for comparison below. h2. Example 1: This job could make completely evenly allocation in theory. // job description StreamExecutionEnvironment env; env.addSource(...).name("a").setParallelism(10). map(...).name("b").setParallelism(30) .map(...).name("c").setParallelism(40) .addSink(...).name("d").setParallelism(20); //We specify -ys 4 , that means the job will apply for 10 TaskExecutors. flink run -yjm 2048 -ytm 2048 -ys 4 -m yarn-cluster -c com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar !image-2021-07-16-10-34-30-700.png|width=1042,height=219! *flink-1.13.1-official*(default config) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10|| |a|0|1|2|0|0|1|2|2|2|0| |b|4|1|3|3|4|4|3|3|3|2| |c|4|4|4|4|4|4|4|4|4|4|
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381794#comment-17381794 ] loyi commented on FLINK-23190: -- [~trohrmann] Thanks for your patience. I have implement a version based on flink-1.13.1, and then I will use two real examples for comparison below. h2. Example 1: This job could make completely evenly allocation in theory. // job description StreamExecutionEnvironment env; env.addSource(...).name("a").setParallelism(10). map(...).name("b").setParallelism(30) .map(...).name("c").setParallelism(40) .addSink(...).name("d").setParallelism(20); //We specify -ys 4 , that means the job will apply for 10 TaskExecutors. flink run -yjm 2048 -ytm 2048 -ys 4 -m yarn-cluster -c com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar !image-2021-07-16-10-34-30-700.png|width=1042,height=219! *flink-1.13.1-official*(default config) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10|| |a|0|1|2|0|0|1|2|2|2|0| |b|4|1|3|3|4|4|3|3|3|2| |c|4|4|4|4|4|4|4|4|4|4| |d|3|1|3|1|4|2|2|2|0|2| *flink-1.13.1-official*(with cluster.evenly-spread-out-slots: true) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10|| |a|1|1|0|2|0|3|1|0|0|2| |b|4|1|3|4|4|3|4|2|3|2| |c|4|4|4|4|4|4|4|4|4|4| |d|3|1|3|2|3|2|2|1|2|1| *flink-1.13.1-patch*(default config) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10|| |a|1|1|1|1|1|1|1|1|1|1| |b|3|3|3|3|3|3|3|3|3|3| |c|4|4|4|4|4|4|4|4|4|4| |d|2|2|2|2|2|2|2|2|2|2| *flink-1.13.1-patch*(with jobmanager.scheduler: adaptive) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10|| |a|1|1|1|1|1|1|1|1|1|1| |b|3|3|3|3|3|3|3|3|3|3| |c|4|4|4|4|4|4|4|4|4|4| |d|2|2|2|2|2|2|2|2|2|2| h2. Example 2: We will construct a job that couldn't acheive absolutely evenly allocation. //job description is the same with above. StreamExecutionEnvironment env; env.addSource(...).name("a").setParallelism(10). map(...).name("b").setParallelism(30) .map(...).name("c").setParallelism(40) .addSink(...).name("d").setParallelism(20); //We specify -ys 3 , that means the job will apply for 14 TaskExecutors. flink run -yjm 2048 -ytm 2048 -ys 3 -m yarn-cluster -c com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar *flink-1.13.1-official*(default config) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14|| |a|3|1|0|3|1|1|1|0|0|0|0|0|0|0| |b|3|3|3|3|2|2|2|1|3|2|3|2|1|0| |c|3|3|3|3|3|3|3|3|3|1|3|3|3|3| |d|2|2|1|2|3|1|1|1|1|1|2|2|1|0| *flink-1.13.1-official*(with cluster.evenly-spread-out-slots: true) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14|| |a|2|1|1|1|2|2|1|0|0|0|0|0|0|0| |b|3|3|2|1|1|3|3|2|1|3|2|3|3|0| |c|3|3|3|1|3|3|3|3|3|3|3|3|3|3| |d|1|2|2|1|2|3|1|2|2|3|1|0|0|0| *flink-1.13.1-patch*(default config) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14|| |a|1|1|1|1|1|0|1|1|0|1|0|1|0|1| |b|3|2|2|2|2|2|3|2|2|2|2|2|2|2| |c|3|3|3|3|3|3|3|3|3|3|3|3|1|3| |d|2|1|2|1|1|1|2|2|1|2|1|2|2|0| *flink-1.13.1-patch*(with jobmanager.scheduler: adaptive) operator's subtask distribution: ||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14|| |a|1|0|0|1|1|1|1|0|0|1|1|1|1|1| |b|2|2|2|2|3|3|3|2|1|2|2|2|2|2| |c|3|3|3|3|3|3|3|3|1|3|3|3|3|3| |d|1|1|1|1|2|2|2|1|1|2|1|2|1|2| We can see that the patch has a better allocation effect, and spread out each operator across all TaskExecutors as possibly as evenly as it can. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1
[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23190: - Attachment: image-2021-07-16-10-34-30-700.png > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23190: - Comment: was deleted (was: Job description: // flink run -yjm 2048 -ytm 2048 -ys 4 -ynm demo-job -m yarn-cluster -c com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo/flink-demo.jar env.addSource(...).name("A").setParallelism(10) .map(...).name("B").setParallelism(30) .map(...).name("C").setParallelism(40) .addSink(...).name("D").setParallelism(20);) > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381661#comment-17381661 ] loyi commented on FLINK-23190: -- Job description: // flink run -yjm 2048 -ytm 2048 -ys 4 -ynm demo-job -m yarn-cluster -c com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo/flink-demo.jar env.addSource(...).name("A").setParallelism(10) .map(...).name("B").setParallelism(30) .map(...).name("C").setParallelism(40) .addSink(...).name("D").setParallelism(20); > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381353#comment-17381353 ] loyi edited comment on FLINK-23190 at 7/15/21, 1:36 PM: [~trohrmann] Here is the differences: 1. cluster.evenly-spread-out-slots could always pick best slots ,and then allocate them by the slot-request order , however the request order sometimes is not "stable" , hence it cause a impact on the allocation. Here is the reason: {code:java} class: SlotSharingExecutionSlotAllocator.java public List allocateSlotsFor( List executionVertexIds) { .. Map> executionsByGroup = executionVertexIds.stream() .collect( Collectors.groupingBy( slotSharingStrategy::getExecutionSlotSharingGroup)); Map slots = ### # The iteration order is uncertain, it could allocate too many slot-requests (belongs to same SlotSharingGroup) to the same TaskExecutor. ### executionsByGroup.keySet().stream() .map(group -> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) .collect( Collectors.toMap( SharedSlot::getExecutionSlotSharingGroup, Function.identity())); .. } {code} In addition,the option doesn't work for active-deployment job. 2. My solution try to allocate the slots (belong to the same TaskExecutor) to each SlotSharingGroup *in turn.* For example, a job has 12 ExecutionSlotSharingGroups, and a TaskExecutor is offering 3 slots (call JobMaster.offerSlots). First ,I will group the slot-requests by SlotSharingGroup (not ExecutionSlotSharingGroup): A: 4 B: 4 C: 4 Then allocate a slot to each SlotSharingGroup by turns, it should be more evenly than allocated by request-order. was (Author: loyi): [~trohrmann] Here is the differences: 1. cluster.evenly-spread-out-slots could always pick best slots ,and then allocate them by the slot-request order , however the request order sometimes is not "stable" , hence it cause a impact on the allocation. Here is the reason: {code:java} class: SlotSharingExecutionSlotAllocator.java public List allocateSlotsFor( List executionVertexIds) { .. Map> executionsByGroup = executionVertexIds.stream() .collect( Collectors.groupingBy( slotSharingStrategy::getExecutionSlotSharingGroup)); Map slots = ### # The iteration order is uncertain, it could allocate too many slot-requests (belongs to same SlotSharingGroup) to the same TaskExecutor. ### executionsByGroup.keySet().stream() .map(group -> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) .collect( Collectors.toMap( SharedSlot::getExecutionSlotSharingGroup, Function.identity())); .. } {code} In addition,the option doesn't work for active-deployment job. 2. My solution try to allocate the slots (belong to the same TaskExecutor) to each SlotSharingGroup *in turn.* For example, a job has 12 ExecutionSlotSharingGroups, and a TaskExecutor is offering 3 slots (call JobMaster.offerSlots). First ,I will group the slot-requests by SlotSharingGroup (not ExecutionSlotSharingGroup): A: 4 B: 4 C: 4 Then allocate a slot to each SlotSharingGroup by turns, it should be more evenly than allocated by request-order. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381353#comment-17381353 ] loyi commented on FLINK-23190: -- [~trohrmann] Here is the differences: 1. cluster.evenly-spread-out-slots could always pick best slots ,and then allocate them by the slot-request order , however the request order sometimes is not "stable" , hence it cause a impact on the allocation. Here is the reason: {code:java} class: SlotSharingExecutionSlotAllocator.java public List allocateSlotsFor( List executionVertexIds) { .. Map> executionsByGroup = executionVertexIds.stream() .collect( Collectors.groupingBy( slotSharingStrategy::getExecutionSlotSharingGroup)); Map slots = ### # The iteration order is uncertain, it could allocate too many slot-requests (belongs to same SlotSharingGroup) to the same TaskExecutor. ### executionsByGroup.keySet().stream() .map(group -> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) .collect( Collectors.toMap( SharedSlot::getExecutionSlotSharingGroup, Function.identity())); .. } {code} In addition,the option doesn't work for active-deployment job. 2. My solution try to allocate the slots (belong to the same TaskExecutor) to each SlotSharingGroup *in turn.* For example, a job has 12 ExecutionSlotSharingGroups, and a TaskExecutor is offering 3 slots (call JobMaster.offerSlots). First ,I will group the slot-requests by SlotSharingGroup (not ExecutionSlotSharingGroup): A: 4 B: 4 C: 4 Then allocate a slot to each SlotSharingGroup by turns, it should be more evenly than allocated by request-order. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380372#comment-17380372 ] loyi commented on FLINK-23190: -- Hi, [~trohrmann] Sorry about my poor englist again, maybe I didn’t understand the true intention of your words above. Could you give me some advice on how to continue this process? > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379093#comment-17379093 ] loyi edited comment on FLINK-23190 at 7/12/21, 10:03 AM: - [~trohrmann] You are right! If a job need 40 phy-slots, and specified ys 3 , it will evently apply 14 TaskExecutors. What ResourceManager can improve is applying 12 TaskExecutors with 3 slots and 2 TaskExecutors with 2 slots. If not optimized, it may apply 13 TaskExecutors with 3 slots and 1 TaskExecutors with 1 slots. It does cause a impact on evenly slot allocation. But it could be avoid by adjusting the parallelism (-ys, etc) of job. If every task's parallelism could be divided exactly by the amout of TaskExecutors,i think it may be ok for that we just let Jobmaster handle this. was (Author: loyi): [~trohrmann] You are right! If a job need 40 phy-slots, and specified ys 3 , it will evently apply 14 TaskExecutors. What ResourceManager can improve is applying 12 TaskExecutors with 3 slots and 2 TaskExecutors with 2 slots. If not optimized, it may apply 13 TaskExecutors with 3 slots and 1 TaskExecutors with 1 slots. It does cause a impact on evenly slot allocation. But it could be avoid by adjusting the parallelism (-ys, etc) of job, hence the main allocation could be decided by Jobmaster. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379093#comment-17379093 ] loyi commented on FLINK-23190: -- [~trohrmann] You are right! If a job need 40 phy-slots, and specified ys 3 , it will evently apply 14 TaskExecutors. What ResourceManager can improve is applying 12 TaskExecutors with 3 slots and 2 TaskExecutors with 2 slots. If not optimized, it may apply 13 TaskExecutors with 3 slots and 1 TaskExecutors with 1 slots. It does cause a impact on evenly slot allocation. But it could be avoid by adjusting the parallelism (-ys, etc) of job, hence the main allocation could be decided by Jobmaster. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379046#comment-17379046 ] loyi commented on FLINK-23190: -- [~trohrmann] hi, maybe we don't need to add a extra strategy for the ResourceManager, as you said above , add a different slot selection strategy on the Jobmaster is enough for the issues. In addition,ResourceManager should be just responsible for resource applying , am i understand right? Looking forward to your reply > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17378430#comment-17378430 ] loyi commented on FLINK-23190: -- [~trohrmann] Do you mean the solution couldn't achieve totally evenly allocation due to we don't do the logic in ResourceManager? I think JobMaster could make evenly allocation without telling slot-sharing detail to ResourceManager, because the allcoation timing is when ResourceManger offering new slots to JobMaster, the current solution could assign each TaskManager' slot as possible as evenly. I also have test several actual work scenes , it all performs well. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377343#comment-17377343 ] loyi edited comment on FLINK-23190 at 7/9/21, 7:45 AM: --- [~trohrmann] thanks for your explanation. I try implement my idea on Flink-1.13.1, and it could work well with the default *declarative-resource-management* and *AdaptiveScheduler*. Here is the idea: # When *SlotSharingExecutionSlotAllocator* start a SlotRequestId, we could let the request bring the "detail" of ExecutionGroup, and then it will be put in *pendingRequests*. # Once *NewSlotsListener* triggers, for example provide 4 slots, we could calculate the proportion of each group by these *requests* , and assign the slot evenly to these groups. # Make *AdaptiveScheduler* allocate free slots to SlotSharingGroup by *circle* order over these TaskManagers instead of *Iteration* order. Considering that we haven't reached consensus yet,i put the concept code on my reposity: Support DeclarativeResourceManagement: [https://github.com/saddays/flink/commit/9e3c51664f77aee1b7ff8b68726a6b5eaa630e5b] Support AdaptiveScheduler: https://github.com/saddays/flink/commit/18e87f794cf2956fa01592e9077d1a5f467ed05a Is this solution feasible? Looking forward to your reply! was (Author: loyi): [~trohrmann] thanks for your explanation. I try implement my idea on Flink-1.13.1, and it could work well with the default *declarative-resource-management* and *AdaptiveScheduler*. Here is the idea: # When *SlotSharingExecutionSlotAllocator* start a SlotRequestId, we could let the request bring the "detail" of ExecutionGroup, and then it will be put in *pendingRequests*. # Once *NewSlotsListener* triggers, for example provide 4 slots, we could calculate the proportion of each group by these *requests* , and assign the slot evenly to these groups. # Make *AdaptiveScheduler* allocate free slots to SlotSharingGroup by *circle* order over these TaskManagers instead of *Iteration* order. Considering that we haven't reached consensus yet,i put the concept code on my reposity: [https://github.com/saddays/flink/commit/9e3c51664f77aee1b7ff8b68726a6b5eaa630e5b] Is this solution feasible? Looking forward to your reply! > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on
[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377343#comment-17377343 ] loyi edited comment on FLINK-23190 at 7/9/21, 7:42 AM: --- [~trohrmann] thanks for your explanation. I try implement my idea on Flink-1.13.1, and it could work well with the default *declarative-resource-management* and *AdaptiveScheduler*. Here is the idea: # When *SlotSharingExecutionSlotAllocator* start a SlotRequestId, we could let the request bring the "detail" of ExecutionGroup, and then it will be put in *pendingRequests*. # Once *NewSlotsListener* triggers, for example provide 4 slots, we could calculate the proportion of each group by these *requests* , and assign the slot evenly to these groups. # Make *AdaptiveScheduler* allocate free slots to SlotSharingGroup by *circle* order over these TaskManagers instead of *Iteration* order. Considering that we haven't reached consensus yet,i put the concept code on my reposity: [https://github.com/saddays/flink/commit/9e3c51664f77aee1b7ff8b68726a6b5eaa630e5b] Is this solution feasible? Looking forward to your reply! was (Author: loyi): [~trohrmann] thanks for your explanation. I try implement my idea on Flink-1.13.1, and it could work well with the default *declarative-resource-management* feature,but not with AdaptiveScheduler. Here is the idea: # When *SlotSharingExecutionSlotAllocator* start a SlotRequestId, we could let the request bring the "detail" of ExecutionGroup, and then it will be put in *pendingRequests*. # Once *NewSlotsListener* triggers, for example provide 4 slots, we could calculate the proportion of each group by these *requests* , and assign the slot evenly to these groups. Considering that we haven't reached consensus yet,i put the concept code on my reposity: https://github.com/saddays/flink/commit/9e3c51664f77aee1b7ff8b68726a6b5eaa630e5b Is this solution feasible? Looking forward to your reply! > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377343#comment-17377343 ] loyi commented on FLINK-23190: -- [~trohrmann] thanks for your explanation. I try implement my idea on Flink-1.13.1, and it could work well with the default *declarative-resource-management* feature,but not with AdaptiveScheduler. Here is the idea: # When *SlotSharingExecutionSlotAllocator* start a SlotRequestId, we could let the request bring the "detail" of ExecutionGroup, and then it will be put in *pendingRequests*. # Once *NewSlotsListener* triggers, for example provide 4 slots, we could calculate the proportion of each group by these *requests* , and assign the slot evenly to these groups. Considering that we haven't reached consensus yet,i put the concept code on my reposity: https://github.com/saddays/flink/commit/9e3c51664f77aee1b7ff8b68726a6b5eaa630e5b Is this solution feasible? Looking forward to your reply! > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375424#comment-17375424 ] loyi edited comment on FLINK-23190 at 7/6/21, 10:23 AM: [~trohrmann] , it seems the solution just works for active resource management, and i think it may not conflict with AdaptiveScheduler, we just spread out the task according to the *current* pending*-*slot-requests . Also , are the two features you mentioned already available? Can them solve the load-balance issues ? thx was (Author: loyi): [~trohrmann] , seems the solution just works for active resource management . Also , are the two features you mentioned already available? Can them solve the load-balance issues ? thx > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375432#comment-17375432 ] loyi edited comment on FLINK-23190 at 7/6/21, 10:05 AM: [~pnowojski] , i have tried the option you mentioned , it seems just works for yarn-session or standalone mode. thx was (Author: loyi): [~pnowojski] , i have tried the option you mentioned , it seems just works for yarn-session or standalone mode. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375424#comment-17375424 ] loyi edited comment on FLINK-23190 at 7/6/21, 10:05 AM: [~trohrmann] , seems the solution just works for active resource management . Also , are the two features you mentioned already available? Can them solve the load-balance issues ? thx was (Author: loyi): [~trohrmann] , seems the solution just works for action resource management . Also , are the two features you mentioned already available? Can them solve the load-balance issues ? > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375432#comment-17375432 ] loyi commented on FLINK-23190: -- [~pnowojski] , i have tried the option you mentioned , it seems just works for yarn-session or standalone mode. > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375424#comment-17375424 ] loyi edited comment on FLINK-23190 at 7/6/21, 10:01 AM: [~trohrmann] , seems the solution just works for action resource management . Also , are the two features you mentioned already available? Can them solve the load-balance issues ? was (Author: loyi): Till Rohrmann , seems the solution just works for action resource management . Also , are the two features you mentioned already available? Can them solve the load-balance issues ? > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375424#comment-17375424 ] loyi commented on FLINK-23190: -- Till Rohrmann , seems the solution just works for action resource management . Also , are the two features you mentioned already available? Can them solve the load-balance issues ? > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23190: - Flags: Patch > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23190: - Description: FLINK-12122 only guarantees spreading out tasks across the set of TMs which are registered at the time of scheduling, but our jobs are all runing on active yarn mode, the job with smaller source parallelism offen cause load-balance issues. For this job: {code:java} // -ys 4 means 10 taskmanagers env.addSource(...).name("A").setParallelism(10). map(...).name("B").setParallelism(30) .map(...).name("C").setParallelism(40) .addSink(...).name("D").setParallelism(20); {code} Flink-1.12.3 task allocation: ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| |A| 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| |C|4|4|4|4|4|4|4|4|4|4| |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| Suggestions: When TaskManger start register slots to slotManager , current processing logic will choose the first pendingSlot which meet its resource requirements. The "random" strategy usually causes uneven task allocation when source-operator's parallelism is significantly below process-operator's. A simple feasible idea is {color:#de350b}partition{color} the current "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let AllocationID bring the detail) , then allocate the slots proportionally to each JobVertexGroup. For above case, the 40 pendingSlots could be divided into 4 groups: [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} [BCD]: 10 [CD]: 10 [D]: 10 Every taskmanager will provide 4 slots one time, and each group will get 1 slot according their proportion (1/4), the final allocation result is below: [ABCD] : deploye on 10 different taskmangers [BCD]: deploye on 10 different taskmangers [CD]: deploye on 10 different taskmangers [D]: deploye on 10 different taskmangers I have implement a [concept code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] based on Flink-1.12.3 , the patch version has {color:#de350b}fully evenly{color} task allocation , and works well on my workload . Are there other point that have not been considered or does it conflict with future plans? Sorry for my poor english. was: Description: FLINK-12122 only guarantees spreading out tasks across the set of TMs which are registered at the time of scheduling, but our jobs are all runing on active yarn mode, the job with smaller source parallelism offen cause load-balance issues. For this job: {code:java} // -ys 4 means 10 taskmanagers env.addSource(...).name("A").setParallelism(10). map(...).name("B").setParallelism(30) .map(...).name("C").setParallelism(40) .addSink(...).name("D").setParallelism(20); {code} released-1.12.3 allocation: ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| |A| 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| |C|4|4|4|4|4|4|4|4|4|4| |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| Suggestions: When TM register slots to slotManager , we could group the pendingRequests by their "ExecutionVertexGroup" , then allocate the slots proportionally to each group. I have implement a concept version based on release-1.12.3 , the job have fully evenly task allocation . I want to know if there are other point that have not been considered ? > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| >
[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23190: - Priority: Major (was: Minor) > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Major > > Description: > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > released-1.12.3 allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TM register slots to slotManager , we could group the pendingRequests by > their "ExecutionVertexGroup" , then allocate the slots proportionally to each > group. > > I have implement a concept version based on release-1.12.3 , the job have > fully evenly task allocation . I want to know if there are other point that > have not been considered ? > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23190: - Description: Description: FLINK-12122 only guarantees spreading out tasks across the set of TMs which are registered at the time of scheduling, but our jobs are all runing on active yarn mode, the job with smaller source parallelism offen cause load-balance issues. For this job: {code:java} // -ys 4 means 10 taskmanagers env.addSource(...).name("A").setParallelism(10). map(...).name("B").setParallelism(30) .map(...).name("C").setParallelism(40) .addSink(...).name("D").setParallelism(20); {code} released-1.12.3 allocation: ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| |A| 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| |C|4|4|4|4|4|4|4|4|4|4| |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| Suggestions: When TM register slots to slotManager , we could group the pendingRequests by their "ExecutionVertexGroup" , then allocate the slots proportionally to each group. I have implement a concept version based on release-1.12.3 , the job have fully evenly task allocation . I want to know if there are other point that have not been considered ? was: Description: FLINK-12122 only guarantees spreading out tasks across the set of TMs which are registered at the time of scheduling, but our jobs are all runing on active yarn mode, the job with smaller source parallelism offen cause load-balance issues. For this job: {code:java} // -ys 4 means 10 taskmanagers env.addSource(...).name("A").setParallelism(10). map(...).name("B").setParallelism(30) .map(...).name("C").setParallelism(40) .addSink(...).name("D").setParallelism(20); {code} released-1.12.3 allocation: ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| |A| 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| |C|4|4|4|4|4|4|4|4|4|4| |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| Suggestions: When TM register slots to slotManager , we could group the pendingRequests by their "ExecutionVertexGroup" , then allocate the slots proportionally to each group. I have implement a concept version based on release-1.12.3 , the job have fully evenly task allocation . I want to know if there are other point that have not been considered ? > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.12.3 >Reporter: loyi >Priority: Minor > > Description: > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > released-1.12.3 allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TM register slots to slotManager , we could group the pendingRequests by > their "ExecutionVertexGroup" , then allocate the slots proportionally to each > group. > > I have implement a concept version based on release-1.12.3 , the job have > fully evenly task allocation . I want to know if there are other point that > have not been considered ? > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23190) Make task-slot allocation much more evenly
loyi created FLINK-23190: Summary: Make task-slot allocation much more evenly Key: FLINK-23190 URL: https://issues.apache.org/jira/browse/FLINK-23190 Project: Flink Issue Type: Improvement Components: Runtime / Task Affects Versions: 1.12.3 Reporter: loyi Description: FLINK-12122 only guarantees spreading out tasks across the set of TMs which are registered at the time of scheduling, but our jobs are all runing on active yarn mode, the job with smaller source parallelism offen cause load-balance issues. For this job: {code:java} // -ys 4 means 10 taskmanagers env.addSource(...).name("A").setParallelism(10). map(...).name("B").setParallelism(30) .map(...).name("C").setParallelism(40) .addSink(...).name("D").setParallelism(20); {code} released-1.12.3 allocation: ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| |A| 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| |C|4|4|4|4|4|4|4|4|4|4| |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| Suggestions: When TM register slots to slotManager , we could group the pendingRequests by their "ExecutionVertexGroup" , then allocate the slots proportionally to each group. I have implement a concept version based on release-1.12.3 , the job have fully evenly task allocation . I want to know if there are other point that have not been considered ? -- This message was sent by Atlassian Jira (v8.3.4#803005)