[jira] [Updated] (FLINK-12762) Memory configuration implementation

2019-06-12 Thread Tony Xintong Song (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tony Xintong Song updated FLINK-12762:
--
Fix Version/s: (was: 1.9.0)

> Memory configuration implementation
> ---
>
> Key: FLINK-12762
> URL: https://issues.apache.org/jira/browse/FLINK-12762
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Tony Xintong Song
>Assignee: Tony Xintong Song
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12812) Set resource profiles for task slots

2019-06-12 Thread Tony Xintong Song (JIRA)
Tony Xintong Song created FLINK-12812:
-

 Summary: Set resource profiles for task slots
 Key: FLINK-12812
 URL: https://issues.apache.org/jira/browse/FLINK-12812
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.8.0, 1.9.0
Reporter: Tony Xintong Song
Assignee: Tony Xintong Song
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12765) Bookkeeping of available resources of allocated slots in SlotPool.

2019-06-09 Thread Tony Xintong Song (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tony Xintong Song updated FLINK-12765:
--
Summary: Bookkeeping of available resources of allocated slots in SlotPool. 
 (was: JobMaster calculates resource needs and requests slot for the entire 
slot sharing group.)

> Bookkeeping of available resources of allocated slots in SlotPool.
> --
>
> Key: FLINK-12765
> URL: https://issues.apache.org/jira/browse/FLINK-12765
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Tony Xintong Song
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.9.0
>
>
> In this version, a task will always requests slot with its own resource need. 
> If the resource need is less than the default slot resource, it will always 
> be allocated to a default sized slot. 
>  
> The extra resources in the slot leaves chances for other tasks within the 
> same slot sharing group to fit in. To take these chance, SlotPool will 
> maintain available resources of each allocated slot. Available resource of an 
> allocated slot should always be the total resource of the slot minus 
> resources of tasks already assigned onto the slot. In this way, the SlotPool 
> would be able to determine whether another task can fit into the slot. If a 
> task cannot fit into the slot, for slot sharing group the SlotPool should 
> request another slot from the ResourceManager, and for colocation group it 
> should fail the job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12761) Fine grained resource management

2019-06-06 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16857497#comment-16857497
 ] 

Tony Xintong Song commented on FLINK-12761:
---

Thanks for the feedbacks, [~xiaogang.shi].

I agree that RM is a critical component and there are more details need to be 
described and discussed in details. And I'd like to bring this proposal to the 
community for further discussions before diving into further details. Just 
trying to get some feedbacks while working on the details. I'll continue to 
update the design docs, adding details and responding to feedbacks from the 
community. I would be graceful for any kind of feedbacks, suggestions and 
questions about unclarity.

For 1.9, there should not be too many changes in RM. For the time being we will 
stick to the "static slots".

> Fine grained resource management
> 
>
> Key: FLINK-12761
> URL: https://issues.apache.org/jira/browse/FLINK-12761
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Tony Xintong Song
>Assignee: Tony Xintong Song
>Priority: Major
>  Labels: Umbrella
>
> This is an umbrella issue for enabling fine grained resource management in 
> Flink.
> Fine grained resource management is a big topic that requires long term 
> efforts. There are many issues to be addressed and designing decisions to be 
> made, some of which may not be resolved in short time. Here we propose our 
> design and implementation plan for the upcoming release 1.9, as well as our 
> thoughts and ideas on the long term road map on this topic.
> A practical short term target is to enable fine grained resource management 
> for batch sql jobs only in the upcoming Flink 1.9. This is necessary for 
> batch operators added from blink to achieve good performance.
> Please find detailed design and implementation plan in attached docs. Any 
> comment and feedback are welcomed and appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12765) JobMaster calculates resource needs and requests slot for the entire slot sharing group.

2019-06-06 Thread Tony Xintong Song (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tony Xintong Song reassigned FLINK-12765:
-

Assignee: Yun Gao  (was: Tony Xintong Song)

> JobMaster calculates resource needs and requests slot for the entire slot 
> sharing group.
> 
>
> Key: FLINK-12765
> URL: https://issues.apache.org/jira/browse/FLINK-12765
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Tony Xintong Song
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12766) Dynamically allocate TaskExecutor’s managed memory to slots. This step is a temporal workaround for release 1.9 to meet the basic usability requirements of batch functio

2019-06-06 Thread Tony Xintong Song (JIRA)
Tony Xintong Song created FLINK-12766:
-

 Summary: Dynamically allocate TaskExecutor’s managed memory to 
slots. This step is a temporal workaround for release 1.9 to meet the basic 
usability requirements of batch functions from Blink.
 Key: FLINK-12766
 URL: https://issues.apache.org/jira/browse/FLINK-12766
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.8.0, 1.9.0
Reporter: Tony Xintong Song
Assignee: Tony Xintong Song
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12765) JobMaster calculates resource needs and requests slot for the entire slot sharing group.

2019-06-06 Thread Tony Xintong Song (JIRA)
Tony Xintong Song created FLINK-12765:
-

 Summary: JobMaster calculates resource needs and requests slot for 
the entire slot sharing group.
 Key: FLINK-12765
 URL: https://issues.apache.org/jira/browse/FLINK-12765
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.8.0, 1.9.0
Reporter: Tony Xintong Song
Assignee: Tony Xintong Song
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12764) Launch customized TaskExecutor for tasks whose resource needs cannot be satisfied by existing TaskExecutor-s in active container mode.

2019-06-06 Thread Tony Xintong Song (JIRA)
Tony Xintong Song created FLINK-12764:
-

 Summary: Launch customized TaskExecutor for tasks whose resource 
needs cannot be satisfied by existing TaskExecutor-s in active container mode.
 Key: FLINK-12764
 URL: https://issues.apache.org/jira/browse/FLINK-12764
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.8.0, 1.9.0
Reporter: Tony Xintong Song
Assignee: Tony Xintong Song
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12763) Fail job immediately if tasks’ resource needs can not be satisfied in reactive container mode.

2019-06-06 Thread Tony Xintong Song (JIRA)
Tony Xintong Song created FLINK-12763:
-

 Summary: Fail job immediately if tasks’ resource needs can not be 
satisfied in reactive container mode.
 Key: FLINK-12763
 URL: https://issues.apache.org/jira/browse/FLINK-12763
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.8.0, 1.9.0
Reporter: Tony Xintong Song
Assignee: Tony Xintong Song
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12762) Memory configuration implementation

2019-06-06 Thread Tony Xintong Song (JIRA)
Tony Xintong Song created FLINK-12762:
-

 Summary: Memory configuration implementation
 Key: FLINK-12762
 URL: https://issues.apache.org/jira/browse/FLINK-12762
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.8.0, 1.9.0
Reporter: Tony Xintong Song
Assignee: Tony Xintong Song
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12761) Fine grained resource managedment

2019-06-06 Thread Tony Xintong Song (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tony Xintong Song updated FLINK-12761:
--
External issue URL:   (was: 
https://docs.google.com/document/d/1UR3vYsLOPXMGVyXHXYg3b5yNZcvSTvot4wela8knVAY/edit?usp=sharing)

> Fine grained resource managedment
> -
>
> Key: FLINK-12761
> URL: https://issues.apache.org/jira/browse/FLINK-12761
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Tony Xintong Song
>Assignee: Tony Xintong Song
>Priority: Major
>  Labels: Umbrella
>
> This is an umbrella issue for enabling fine grained resource management in 
> Flink.
> Fine grained resource management is a big topic that requires long term 
> efforts. There are many issues to be addressed and designing decisions to be 
> made, some of which may not be resolved in short time. Here we propose our 
> design and implementation plan for the upcoming release 1.9, as well as our 
> thoughts and ideas on the long term road map on this topic.
> A practical short term target is to enable fine grained resource management 
> for batch sql jobs only in the upcoming Flink 1.9. This is necessary for 
> batch operators added from blink to achieve good performance.
> Please find detailed design and implementation plan in attached docs. Any 
> comment and feedback are welcomed and appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12761) Fine grained resource managedment

2019-06-06 Thread Tony Xintong Song (JIRA)
Tony Xintong Song created FLINK-12761:
-

 Summary: Fine grained resource managedment
 Key: FLINK-12761
 URL: https://issues.apache.org/jira/browse/FLINK-12761
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Affects Versions: 1.8.0, 1.9.0
Reporter: Tony Xintong Song
Assignee: Tony Xintong Song


This is an umbrella issue for enabling fine grained resource management in 
Flink.

Fine grained resource management is a big topic that requires long term 
efforts. There are many issues to be addressed and designing decisions to be 
made, some of which may not be resolved in short time. Here we propose our 
design and implementation plan for the upcoming release 1.9, as well as our 
thoughts and ideas on the long term road map on this topic.

A practical short term target is to enable fine grained resource management for 
batch sql jobs only in the upcoming Flink 1.9. This is necessary for batch 
operators added from blink to achieve good performance.

Please find detailed design and implementation plan in attached docs. Any 
comment and feedback are welcomed and appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12362) Recover container number config option for Flink on yarn job cluster mode in the doc

2019-04-29 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16829029#comment-16829029
 ] 

Tony Xintong Song commented on FLINK-12362:
---

Hi, [~yanghua],

I don't understand what is your concern about calculating the number of 
containers in job cluster mode.

To my understanding, Flink launches TEs as many as the job needs by starting a 
new TE whenever a slot request cannot be fulfilled. Users do not need to set 
the number of TEs, which looks good to me.

There is a config option taskmanager.numberOfTaskSlots that controls the number 
of slots per task executor. And if both the number of total tasks (depends on 
parallelisms and operator chaining) and the number of slots per TE are 
determined, then the number of TEs should also be determined.

> Recover container number config option for Flink on yarn job cluster mode in 
> the doc
> 
>
> Key: FLINK-12362
> URL: https://issues.apache.org/jira/browse/FLINK-12362
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Recently, the {{-n}} config option has been removed from the document, the 
> commit is here: 
> [https://github.com/apache/flink/commit/60507b8baf5fba999b7c2c45c150914f38f5ce38#diff-8ae07fac4c1f4137fc1ef062aef70115L100]
> I can understand that maybe this option is useless for Flink session. But for 
> job cluster mode, remove this option would confuse the users:
>  * the {{-n}} has been deprecated and replaced with {{-p}}?
>  * {{-p}} means the job level parallelism, but if users want to set a single 
> operator's parallelism? how to calculate the number of containers
>  * the {{-n}} option still exists in {{FlinkYarnSessionCli}}
>  * we have customized the source code which broken the rule: one container 
> one slot, so the {{-n}} is still useful
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-04-08 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812308#comment-16812308
 ] 

Tony Xintong Song edited comment on FLINK-12122 at 4/8/19 10:50 AM:


Hi [~till.rohrmann], I agree with you that we should spread tasks evenly across 
TaskManagers.

FYI, similar behavior change is already included in the to-be-merged [blink 
branch|https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DynamicAssigningSlotManager.java],
 where we support three policies (random, spreading based on available slots, 
and spreading based on available resource) for distribute slots to TaskManagers 
for session mode.


was (Author: xintongsong):
Hi [~till.rohrmann], I agree with you that we should spread tasks evenly across 
TaskManagers.

FYI, similar behavior change is already included in the to-be-merged [blink 
branch|https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DynamicAssigningSlotManager.java],
 where we support three policies (random, spreading based on available slots, 
and spreading based on available resource) for distribute slots to TaskManagers 
for session mode. [link title|http://example.com]

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-04-08 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812308#comment-16812308
 ] 

Tony Xintong Song edited comment on FLINK-12122 at 4/8/19 10:46 AM:


Hi [~till.rohrmann], I agree with you that we should spread tasks evenly across 
TaskManagers.

FYI, similar behavior change is already included in the to-be-merged [blink 
branch|https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DynamicAssigningSlotManager.java],
 where we support three policies (random, spreading based on available slots, 
and spreading based on available resource) for distribute slots to TaskManagers 
for session mode. [link title|http://example.com]


was (Author: xintongsong):
Hi [~till.rohrmann], I agree with you that we should spread tasks evenly across 
TaskManagers.

FYI, similar behavior change is already included in the to-be-merged [blink 
branch|[https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DynamicAssigningSlotManager.java]],
 where we support three policies (random, spreading based on available slots, 
and spreading based on available resource) for distribute slots to TaskManagers 
for session mode.

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-04-08 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812308#comment-16812308
 ] 

Tony Xintong Song commented on FLINK-12122:
---

Hi [~till.rohrmann], I agree with you that we should spread tasks evenly across 
TaskManagers.

FYI, similar behavior change is already included in the to-be-merged [blink 
branch|[https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DynamicAssigningSlotManager.java]],
 where we support three policies (random, spreading based on available slots, 
and spreading based on available resource) for distribute slots to TaskManagers 
for session mode.

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11166) Slot Placement Constraint

2018-12-14 Thread Tony Xintong Song (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tony Xintong Song reassigned FLINK-11166:
-

Assignee: Tony Xintong Song

> Slot Placement Constraint
> -
>
> Key: FLINK-11166
> URL: https://issues.apache.org/jira/browse/FLINK-11166
> Project: Flink
>  Issue Type: New Feature
>  Components: ResourceManager
>Reporter: Tony Xintong Song
>Assignee: Tony Xintong Song
>Priority: Major
>
> In many cases, users may want Flink to schedule their job tasks following 
> certain locality preferences. E.g., colocating upstream/downstream tasks to 
> reduce data transmission costs, dispersing tasks of certain pattern (e.g., 
> I/O intensive) to avoid resource competitions, running tasks in exclusive 
> TaskExecutor-s for task level resource consumption measurements, etc.
> Currently, there are two ways in Flink to specify such locality preferences: 
> specifying preferred locations in the slot request, or setting slot sharing 
> group for the task. In both ways the preferences are specified when 
> requesting slots from the SlotPool and can affect how tasks are placed among 
> the slots allocated to the JobMaster.
> However, there is no guarantee that such preferences can always be satisfied, 
> especially when slots are customized with different resource profiles for 
> different kinds of tasks. E.g., in cases where two tasks A and B are 
> preferred to be scheduled onto a same TaskExecutor, it is possible that none 
> of the slots customized for A offered to the JobMaster are collocated with 
> slots customized for B.
> To better support locality preferences with various slot resource 
> specifications, it is necessary to allow JobMaster-s to request slots 
> subjected to certain placement constraints from the ResourceManager.
> In addition, most underlying frameworks Flink runs on (Yarn, Kubernetes, 
> Mesos) already have individual supports for container level placement 
> constraints. It is a great opportunity for Flink to leverage such underlying 
> supports and enable scheduling tasks with rich locality preferences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11166) Slot Placement Constraint

2018-12-14 Thread Tony Xintong Song (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tony Xintong Song updated FLINK-11166:
--
Description: 
In many cases, users may want Flink to schedule their job tasks following 
certain locality preferences. E.g., colocating upstream/downstream tasks to 
reduce data transmission costs, dispersing tasks of certain pattern (e.g., I/O 
intensive) to avoid resource competitions, running tasks in exclusive 
TaskExecutor-s for task level resource consumption measurements, etc.

Currently, there are two ways in Flink to specify such locality preferences: 
specifying preferred locations in the slot request, or setting slot sharing 
group for the task. In both ways the preferences are specified when requesting 
slots from the SlotPool and can affect how tasks are placed among the slots 
allocated to the JobMaster.

However, there is no guarantee that such preferences can always be satisfied, 
especially when slots are customized with different resource profiles for 
different kinds of tasks. E.g., in cases where two tasks A and B are preferred 
to be scheduled onto a same TaskExecutor, it is possible that none of the slots 
customized for A offered to the JobMaster are collocated with slots customized 
for B.

To better support locality preferences with various slot resource 
specifications, it is necessary to allow JobMaster-s to request slots subjected 
to certain placement constraints from the ResourceManager.

In addition, most underlying frameworks Flink runs on (Yarn, Kubernetes, Mesos) 
already have individual supports for container level placement constraints. It 
is a great opportunity for Flink to leverage such underlying supports and 
enable scheduling tasks with rich locality preferences.

  was:
In many cases, users may want Flink to schedule their job tasks following 
certain locality preferences. E.g., colocating upstream/downstream tasks to 
reduce data transmission costs, dispersing tasks of certain pattern (e.g., I/O 
intensive) to avoid resource competitions, running tasks in exclusive 
TaskExecutor-s for task level resource consumption measurements, etc.

 

Currently, there are two ways in Flink to specify such locality preferences: 
specifying preferred locations in the slot request, or setting slot sharing 
group for the task. In both ways the preferences are specified when requesting 
slots from the SlotPool and can affect how tasks are placed among the slots 
allocated to the JobMaster.

However, there is no guarantee that such preferences can always be satisfied, 
especially when slots are customized with different resource profiles for 
different kinds of tasks. E.g., in cases where two tasks A and B are preferred 
to be scheduled onto a same TaskExecutor, it is possible that none of the slots 
customized for A offered to the JobMaster are collocated with slots customized 
for B.

To better support locality preferences with various slot resource 
specifications, it is necessary to allow JobMaster-s to request slots subjected 
to certain placement constraints from the ResourceManager.

In addition, most underlying frameworks Flink runs on (Yarn, Kubernetes, Mesos) 
already have individual supports for container level placement constraints. It 
is a great opportunity for Flink to leverage such underlying supports and 
enable scheduling tasks with rich locality preferences.


> Slot Placement Constraint
> -
>
> Key: FLINK-11166
> URL: https://issues.apache.org/jira/browse/FLINK-11166
> Project: Flink
>  Issue Type: New Feature
>  Components: ResourceManager
>Reporter: Tony Xintong Song
>Priority: Major
>
> In many cases, users may want Flink to schedule their job tasks following 
> certain locality preferences. E.g., colocating upstream/downstream tasks to 
> reduce data transmission costs, dispersing tasks of certain pattern (e.g., 
> I/O intensive) to avoid resource competitions, running tasks in exclusive 
> TaskExecutor-s for task level resource consumption measurements, etc.
> Currently, there are two ways in Flink to specify such locality preferences: 
> specifying preferred locations in the slot request, or setting slot sharing 
> group for the task. In both ways the preferences are specified when 
> requesting slots from the SlotPool and can affect how tasks are placed among 
> the slots allocated to the JobMaster.
> However, there is no guarantee that such preferences can always be satisfied, 
> especially when slots are customized with different resource profiles for 
> different kinds of tasks. E.g., in cases where two tasks A and B are 
> preferred to be scheduled onto a same TaskExecutor, it is possible that none 
> of the slots customized for A offered to the JobMaster are collocated with 
> slots customized for B.
> To better support locality 

[jira] [Commented] (FLINK-11166) Slot Placement Constraint

2018-12-14 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721074#comment-16721074
 ] 

Tony Xintong Song commented on FLINK-11166:
---

Hi [~till.rohrmann], I'd like to work on this issue myself. Could you please 
kindly give me the contributor permission? Many thanks.

> Slot Placement Constraint
> -
>
> Key: FLINK-11166
> URL: https://issues.apache.org/jira/browse/FLINK-11166
> Project: Flink
>  Issue Type: New Feature
>  Components: ResourceManager
>Reporter: Tony Xintong Song
>Priority: Major
>
> In many cases, users may want Flink to schedule their job tasks following 
> certain locality preferences. E.g., colocating upstream/downstream tasks to 
> reduce data transmission costs, dispersing tasks of certain pattern (e.g., 
> I/O intensive) to avoid resource competitions, running tasks in exclusive 
> TaskExecutor-s for task level resource consumption measurements, etc.
>  
> Currently, there are two ways in Flink to specify such locality preferences: 
> specifying preferred locations in the slot request, or setting slot sharing 
> group for the task. In both ways the preferences are specified when 
> requesting slots from the SlotPool and can affect how tasks are placed among 
> the slots allocated to the JobMaster.
> However, there is no guarantee that such preferences can always be satisfied, 
> especially when slots are customized with different resource profiles for 
> different kinds of tasks. E.g., in cases where two tasks A and B are 
> preferred to be scheduled onto a same TaskExecutor, it is possible that none 
> of the slots customized for A offered to the JobMaster are collocated with 
> slots customized for B.
> To better support locality preferences with various slot resource 
> specifications, it is necessary to allow JobMaster-s to request slots 
> subjected to certain placement constraints from the ResourceManager.
> In addition, most underlying frameworks Flink runs on (Yarn, Kubernetes, 
> Mesos) already have individual supports for container level placement 
> constraints. It is a great opportunity for Flink to leverage such underlying 
> supports and enable scheduling tasks with rich locality preferences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11166) Slot Placement Constraint

2018-12-14 Thread Tony Xintong Song (JIRA)
Tony Xintong Song created FLINK-11166:
-

 Summary: Slot Placement Constraint
 Key: FLINK-11166
 URL: https://issues.apache.org/jira/browse/FLINK-11166
 Project: Flink
  Issue Type: New Feature
  Components: ResourceManager
Reporter: Tony Xintong Song


In many cases, users may want Flink to schedule their job tasks following 
certain locality preferences. E.g., colocating upstream/downstream tasks to 
reduce data transmission costs, dispersing tasks of certain pattern (e.g., I/O 
intensive) to avoid resource competitions, running tasks in exclusive 
TaskExecutor-s for task level resource consumption measurements, etc.

 

Currently, there are two ways in Flink to specify such locality preferences: 
specifying preferred locations in the slot request, or setting slot sharing 
group for the task. In both ways the preferences are specified when requesting 
slots from the SlotPool and can affect how tasks are placed among the slots 
allocated to the JobMaster.

However, there is no guarantee that such preferences can always be satisfied, 
especially when slots are customized with different resource profiles for 
different kinds of tasks. E.g., in cases where two tasks A and B are preferred 
to be scheduled onto a same TaskExecutor, it is possible that none of the slots 
customized for A offered to the JobMaster are collocated with slots customized 
for B.

To better support locality preferences with various slot resource 
specifications, it is necessary to allow JobMaster-s to request slots subjected 
to certain placement constraints from the ResourceManager.

In addition, most underlying frameworks Flink runs on (Yarn, Kubernetes, Mesos) 
already have individual supports for container level placement constraints. It 
is a great opportunity for Flink to leverage such underlying supports and 
enable scheduling tasks with rich locality preferences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10640) Enable Slot Resource Profile for Resource Management

2018-12-05 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710884#comment-16710884
 ] 

Tony Xintong Song commented on FLINK-10640:
---

Hi [~Tison], what you proposed is exactly what I've been thinking. I think it's 
actually an important part of the TM management. If we are going to manage 
dynamic number of {{TaskExecutor}} s, I think we do need such a pair of (min, 
max) to allow users limiting the range of {{TaskExecutor}} amount.

> Enable Slot Resource Profile for Resource Management
> 
>
> Key: FLINK-10640
> URL: https://issues.apache.org/jira/browse/FLINK-10640
> Project: Flink
>  Issue Type: New Feature
>  Components: ResourceManager
>Reporter: Tony Xintong Song
>Priority: Major
>
> Motivation & Backgrounds
>  * The existing concept of task slots roughly represents how many pipeline of 
> tasks a TaskManager can hold. However, it does not consider the differences 
> in resource needs and usage of individual tasks. Enabling resource profiles 
> of slots may allow Flink to better allocate execution resources according to 
> tasks fine-grained resource needs.
>  * The community version Flink already contains APIs and some implementation 
> for slot resource profile. However, such logic is not truly used. 
> (ResourceProfile of slot requests is by default set to UNKNOWN with negative 
> values, thus matches any given slot.)
> Preliminary Design
>  * Slot Management
>  A slot represents a certain amount of resources for a single pipeline of 
> tasks to run in on a TaskManager. Initially, a TaskManager does not have any 
> slots but a total amount of resources. When allocating, the ResourceManager 
> finds proper TMs to generate new slots for the tasks to run according to the 
> slot requests. Once generated, the slot's size (resource profile) does not 
> change until it's freed. ResourceManager can apply different, portable 
> strategies to allocate slots from TaskManagers.
>  * TM Management
>  The size and number of TaskManagers and when to start them can also be 
> flexible. TMs can be started and released dynamically, and may have different 
> sizes. We may have many different, portable strategies. E.g., an elastic 
> session that can run multiple jobs like the session mode while dynamically 
> adjusting the size of session (number of TMs) according to the realtime 
> working load.
>  * About Slot Sharing
>  Slot sharing is a good heuristic to easily calculate how many slots needed 
> to get the job running and get better utilization when there is no resource 
> profile in slots. However, with resource profiles enabling finer-grained 
> resource management, each individual task has its specific resource need and 
> it does not make much sense to have multiple tasks sharing the resource of 
> the same slot. Instead, we may introduce locality preferences/constraints to 
> support the semantics of putting tasks in same/different TMs in a more 
> general way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10640) Enable Slot Resource Profile for Resource Management

2018-12-03 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708124#comment-16708124
 ] 

Tony Xintong Song commented on FLINK-10640:
---

Hi [~Tison], it's a good question. Currently there is only one fixed policy 
managing TMs's lifecycles, which is to start a new TM when a slot request 
cannot be fulfilled with existing ones. I think RM should be able to apply 
various policies to decide when to start/release TMs and how many TMs the 
session should hold. E.g., start arbitrary TMs on yarn session launched like 
you mentioned, or start new TMs when the amount of empty TMs is less then a 
certain number, release a TM is it's been empty for a while, etc. And more 
importantly, the framework should provide flexible and configurable interfaces 
for such policies to customize the TM management behaviors.

> Enable Slot Resource Profile for Resource Management
> 
>
> Key: FLINK-10640
> URL: https://issues.apache.org/jira/browse/FLINK-10640
> Project: Flink
>  Issue Type: New Feature
>  Components: ResourceManager
>Reporter: Tony Xintong Song
>Priority: Major
>
> Motivation & Backgrounds
>  * The existing concept of task slots roughly represents how many pipeline of 
> tasks a TaskManager can hold. However, it does not consider the differences 
> in resource needs and usage of individual tasks. Enabling resource profiles 
> of slots may allow Flink to better allocate execution resources according to 
> tasks fine-grained resource needs.
>  * The community version Flink already contains APIs and some implementation 
> for slot resource profile. However, such logic is not truly used. 
> (ResourceProfile of slot requests is by default set to UNKNOWN with negative 
> values, thus matches any given slot.)
> Preliminary Design
>  * Slot Management
>  A slot represents a certain amount of resources for a single pipeline of 
> tasks to run in on a TaskManager. Initially, a TaskManager does not have any 
> slots but a total amount of resources. When allocating, the ResourceManager 
> finds proper TMs to generate new slots for the tasks to run according to the 
> slot requests. Once generated, the slot's size (resource profile) does not 
> change until it's freed. ResourceManager can apply different, portable 
> strategies to allocate slots from TaskManagers.
>  * TM Management
>  The size and number of TaskManagers and when to start them can also be 
> flexible. TMs can be started and released dynamically, and may have different 
> sizes. We may have many different, portable strategies. E.g., an elastic 
> session that can run multiple jobs like the session mode while dynamically 
> adjusting the size of session (number of TMs) according to the realtime 
> working load.
>  * About Slot Sharing
>  Slot sharing is a good heuristic to easily calculate how many slots needed 
> to get the job running and get better utilization when there is no resource 
> profile in slots. However, with resource profiles enabling finer-grained 
> resource management, each individual task has its specific resource need and 
> it does not make much sense to have multiple tasks sharing the resource of 
> the same slot. Instead, we may introduce locality preferences/constraints to 
> support the semantics of putting tasks in same/different TMs in a more 
> general way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11034) Provide "rewriting config” to file system factory

2018-11-29 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704369#comment-16704369
 ] 

Tony Xintong Song commented on FLINK-11034:
---

I agree with you that we should allow users to config multiple jobs in the same 
session differently. However, there are also config options that should remain 
consistent during the lifecycle of the whole session. I think the first step 
towards this job-specific config issue is probably separate config options into 
two categories: job-level configs which should be configured for each job 
independently, and session-level configs which should remain consistent. And 
probably we should have two config files for the two categories. Otherwise, it 
could be troublesome for the user to figure out which config options can be 
"rewritten" when submitting a new job to a exist session and which can not.

> Provide "rewriting config” to file system factory 
> --
>
> Key: FLINK-11034
> URL: https://issues.apache.org/jira/browse/FLINK-11034
> Project: Flink
>  Issue Type: Improvement
>Reporter: Wei-Che Wei
>Priority: Major
>
> In the discussion in this mailing thread 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/how-to-override-s3-key-config-in-flink-job-td24606.html],
>  it showed that it is not able to overwrite config in file system factory 
> when submit a flink job.
> That means we will share the same config for multiple jobs in a session 
> cluster. Or user can't use different configuration for checkpointing and file 
> sink. For example, user might have different s3 buckets for checkpointing and 
> file sink, but each of the s3 bucket might have different s3 access key for 
> some management concerns.
> We might need to provide a way to overwrite configuration when calling file 
> system factory "get" method, and let those user facing components, like 
> checkpointing or file sink, will be able to get overwriting config from user 
> and create a file system with those changes in the new config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10640) Enable Slot Resource Profile for Resource Management

2018-10-22 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658957#comment-16658957
 ] 

Tony Xintong Song commented on FLINK-10640:
---

Thank you [~till.rohrmann]. I'll start drafting a design doc.

> Enable Slot Resource Profile for Resource Management
> 
>
> Key: FLINK-10640
> URL: https://issues.apache.org/jira/browse/FLINK-10640
> Project: Flink
>  Issue Type: New Feature
>  Components: ResourceManager
>Reporter: Tony Xintong Song
>Priority: Major
>
> Motivation & Backgrounds
>  * The existing concept of task slots roughly represents how many pipeline of 
> tasks a TaskManager can hold. However, it does not consider the differences 
> in resource needs and usage of individual tasks. Enabling resource profiles 
> of slots may allow Flink to better allocate execution resources according to 
> tasks fine-grained resource needs.
>  * The community version Flink already contains APIs and some implementation 
> for slot resource profile. However, such logic is not truly used. 
> (ResourceProfile of slot requests is by default set to UNKNOWN with negative 
> values, thus matches any given slot.)
> Preliminary Design
>  * Slot Management
>  A slot represents a certain amount of resources for a single pipeline of 
> tasks to run in on a TaskManager. Initially, a TaskManager does not have any 
> slots but a total amount of resources. When allocating, the ResourceManager 
> finds proper TMs to generate new slots for the tasks to run according to the 
> slot requests. Once generated, the slot's size (resource profile) does not 
> change until it's freed. ResourceManager can apply different, portable 
> strategies to allocate slots from TaskManagers.
>  * TM Management
>  The size and number of TaskManagers and when to start them can also be 
> flexible. TMs can be started and released dynamically, and may have different 
> sizes. We may have many different, portable strategies. E.g., an elastic 
> session that can run multiple jobs like the session mode while dynamically 
> adjusting the size of session (number of TMs) according to the realtime 
> working load.
>  * About Slot Sharing
>  Slot sharing is a good heuristic to easily calculate how many slots needed 
> to get the job running and get better utilization when there is no resource 
> profile in slots. However, with resource profiles enabling finer-grained 
> resource management, each individual task has its specific resource need and 
> it does not make much sense to have multiple tasks sharing the resource of 
> the same slot. Instead, we may introduce locality preferences/constraints to 
> support the semantics of putting tasks in same/different TMs in a more 
> general way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10640) Enable Slot Resource Profile for Resource Management

2018-10-22 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658929#comment-16658929
 ] 

Tony Xintong Song commented on FLINK-10640:
---

Hi [~Tison], thanks for the comment, and thanks for pinging Till. Indeed this 
is a big proposal, so I'd like to put this to a wide discussion before we the 
implementation.

I think the fundamental difference between current slot-based management and 
resource based management is that, the latter considers the amount and size of 
slots while the former considers the amount only. Although the group sharing 
mechanism helps improve resource utilization without knowing precise resource 
need of individual tasks, it has limitations especially when tasks' parallelism 
have significant difference. We believe that resource based management will 
allow Flink to apply more flexible resource management strategies and 
algorithms, while the sharing group based slot allocation may be only one of 
them.

As for [Flink-10407|https://issues.apache.org/jira/browse/FLINK-10407], we did 
notice that people are working on reactive mode and auto-scaling. As far as I 
can see, this issue does not run counter to the issues you mentioned. We are 
closely following the progress of these issues and carefully revising our 
design to make sure it supports reactive mode and auto-scaling.

Elastic session is just a preliminary idea. It's more like the session mode but 
the amount of TMs is not fixed. We do not have clear plan for this yet. My 
point here is that we should allow portable strategies to make strategy related 
decisions (the size and number of TMs and when to create and release them).

> Enable Slot Resource Profile for Resource Management
> 
>
> Key: FLINK-10640
> URL: https://issues.apache.org/jira/browse/FLINK-10640
> Project: Flink
>  Issue Type: New Feature
>  Components: ResourceManager
>Reporter: Tony Xintong Song
>Priority: Major
>
> Motivation & Backgrounds
>  * The existing concept of task slots roughly represents how many pipeline of 
> tasks a TaskManager can hold. However, it does not consider the differences 
> in resource needs and usage of individual tasks. Enabling resource profiles 
> of slots may allow Flink to better allocate execution resources according to 
> tasks fine-grained resource needs.
>  * The community version Flink already contains APIs and some implementation 
> for slot resource profile. However, such logic is not truly used. 
> (ResourceProfile of slot requests is by default set to UNKNOWN with negative 
> values, thus matches any given slot.)
> Preliminary Design
>  * Slot Management
>  A slot represents a certain amount of resources for a single pipeline of 
> tasks to run in on a TaskManager. Initially, a TaskManager does not have any 
> slots but a total amount of resources. When allocating, the ResourceManager 
> finds proper TMs to generate new slots for the tasks to run according to the 
> slot requests. Once generated, the slot's size (resource profile) does not 
> change until it's freed. ResourceManager can apply different, portable 
> strategies to allocate slots from TaskManagers.
>  * TM Management
>  The size and number of TaskManagers and when to start them can also be 
> flexible. TMs can be started and released dynamically, and may have different 
> sizes. We may have many different, portable strategies. E.g., an elastic 
> session that can run multiple jobs like the session mode while dynamically 
> adjusting the size of session (number of TMs) according to the realtime 
> working load.
>  * About Slot Sharing
>  Slot sharing is a good heuristic to easily calculate how many slots needed 
> to get the job running and get better utilization when there is no resource 
> profile in slots. However, with resource profiles enabling finer-grained 
> resource management, each individual task has its specific resource need and 
> it does not make much sense to have multiple tasks sharing the resource of 
> the same slot. Instead, we may introduce locality preferences/constraints to 
> support the semantics of putting tasks in same/different TMs in a more 
> general way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)