[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used

2020-09-08 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191985#comment-17191985
 ] 

Yang Wang commented on FLINK-19151:
---

[~xintongsong] Thanks for the explanation. I agree with you that we could start 
with the short-term solution. Since it do not need too much efforts and could 
fix the issues. 

> Flink does not normalize container resource with correct configurations when 
> Yarn FairScheduler is used 
> 
>
> Key: FLINK-19151
> URL: https://issues.apache.org/jira/browse/FLINK-19151
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.11.2
>Reporter: Xintong Song
>Assignee: jinhai
>Priority: Major
>  Labels: pull-request-available
>
> h3. Problem
> It's a Yarn protocol that the requested container resource will be normalized 
> for allocation. That means, the allocated container may have different 
> resource (larger than or equal to) compared to what is requested.
> Currently, Flink matches the allocated containers to the original requests by 
> reading the Yarn configurations and calculate how the requested resources 
> should be normalized.
> What has been overlooked is that, Yarn FairScheduler (and its subclass 
> SLSFairScheduler) has overridden the normalization behavior. To be specific,
>  * By default, Yarn normalize container resources to integer multiple of 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
>  * FairScheduler normalize container resources to integer multiple of 
> "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the 
> deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while 
> making sure the resource is no less than 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
> h3. Proposal for short term solution
> To fix this problem, a quick and easy way is to also read Yarn configuration 
> and learn which scheduler is used, and perform normalization calculations 
> accordingly. This should be good enough to cover behaviors of all the 
> schedulers that Yarn currently provides. The limitation is that, Flink will 
> not be able to deal with custom Yarn schedulers which override the 
> normalization behaviors.
> h3. Proposal for long term solution
> For long term, it would be good to use Yarn 
> ContainerRequest#allocationRequestId to match the allocated containers with 
> the original requests, so that Flink no longer needs to understand how Yarn 
> normalize container resources. 
> Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while 
> ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution 
> would not work at the moment.
> Another idea is to support various Hadoop versions with different container 
> matching logics. We can abstract the container matching logics into a 
> dedicating component, and provide different implementations for it. This will 
> allow Flink to take advantages of the new versions (e.g., work well with 
> custom schedulers), while stay compatible with the old versions without those 
> advantages.
> Given that we need the resource based matching anyway for the old Hadoop 
> versions, and the cost for maintaining two sets of matching logics, I tend to 
> think this approach as a back-up option to be worked on when we indeed see a 
> need for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used

2020-09-07 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191956#comment-17191956
 ] 

Xintong Song commented on FLINK-19151:
--

Thanks for the input, [~fly_in_gis].

I think how do we support various hadoop versions w.r.t. the API and feature 
differences is a big topic, which probably deserve a separate discussion 
thread. 

In addition to the container allocation request id, currently in Flink there 
are also other usages of Yarn APIs/features that are not supported by all 
hadoop versions. E.g., getting previous attempt containers and scheduler 
resource types from register application response, requesting containers with 
external resources (a.t.m. GPU), submitting application with specific tags and 
node labels, e.t.c. I think it would be better to take all these issues into 
consideration, and try to come up with some general principles how we handle 
such version diversities.

Such discussion could take some time, at mean time it might be good to fix the 
problem with the proposed short-term solution, which should not require much 
efforts.

WDYT?

> Flink does not normalize container resource with correct configurations when 
> Yarn FairScheduler is used 
> 
>
> Key: FLINK-19151
> URL: https://issues.apache.org/jira/browse/FLINK-19151
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.11.2
>Reporter: Xintong Song
>Assignee: jinhai
>Priority: Major
>  Labels: pull-request-available
>
> h3. Problem
> It's a Yarn protocol that the requested container resource will be normalized 
> for allocation. That means, the allocated container may have different 
> resource (larger than or equal to) compared to what is requested.
> Currently, Flink matches the allocated containers to the original requests by 
> reading the Yarn configurations and calculate how the requested resources 
> should be normalized.
> What has been overlooked is that, Yarn FairScheduler (and its subclass 
> SLSFairScheduler) has overridden the normalization behavior. To be specific,
>  * By default, Yarn normalize container resources to integer multiple of 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
>  * FairScheduler normalize container resources to integer multiple of 
> "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the 
> deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while 
> making sure the resource is no less than 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
> h3. Proposal for short term solution
> To fix this problem, a quick and easy way is to also read Yarn configuration 
> and learn which scheduler is used, and perform normalization calculations 
> accordingly. This should be good enough to cover behaviors of all the 
> schedulers that Yarn currently provides. The limitation is that, Flink will 
> not be able to deal with custom Yarn schedulers which override the 
> normalization behaviors.
> h3. Proposal for long term solution
> For long term, it would be good to use Yarn 
> ContainerRequest#allocationRequestId to match the allocated containers with 
> the original requests, so that Flink no longer needs to understand how Yarn 
> normalize container resources. 
> Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while 
> ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution 
> would not work at the moment.
> Another idea is to support various Hadoop versions with different container 
> matching logics. We can abstract the container matching logics into a 
> dedicating component, and provide different implementations for it. This will 
> allow Flink to take advantages of the new versions (e.g., work well with 
> custom schedulers), while stay compatible with the old versions without those 
> advantages.
> Given that we need the resource based matching anyway for the old Hadoop 
> versions, and the cost for maintaining two sets of matching logics, I tend to 
> think this approach as a back-up option to be worked on when we indeed see a 
> need for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used

2020-09-07 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191917#comment-17191917
 ] 

Yang Wang commented on FLINK-19151:
---

I prefer the long term solution and +1 for the different implementations based 
on hadoop versions. Currently, many hadoop versions are going to EOL, including 
[2.0.x - 2.9.x], [3.0.x]. And the community strongly suggests to upgrade to 
hadoop-3.1. It should be stable enough.

 

[https://cwiki.apache.org/confluence/display/HADOOP/EOL+(End-of-life)+Release+Branches]

> Flink does not normalize container resource with correct configurations when 
> Yarn FairScheduler is used 
> 
>
> Key: FLINK-19151
> URL: https://issues.apache.org/jira/browse/FLINK-19151
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.11.2
>Reporter: Xintong Song
>Assignee: jinhai
>Priority: Major
>  Labels: pull-request-available
>
> h3. Problem
> It's a Yarn protocol that the requested container resource will be normalized 
> for allocation. That means, the allocated container may have different 
> resource (larger than or equal to) compared to what is requested.
> Currently, Flink matches the allocated containers to the original requests by 
> reading the Yarn configurations and calculate how the requested resources 
> should be normalized.
> What has been overlooked is that, Yarn FairScheduler (and its subclass 
> SLSFairScheduler) has overridden the normalization behavior. To be specific,
>  * By default, Yarn normalize container resources to integer multiple of 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
>  * FairScheduler normalize container resources to integer multiple of 
> "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the 
> deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while 
> making sure the resource is no less than 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
> h3. Proposal for short term solution
> To fix this problem, a quick and easy way is to also read Yarn configuration 
> and learn which scheduler is used, and perform normalization calculations 
> accordingly. This should be good enough to cover behaviors of all the 
> schedulers that Yarn currently provides. The limitation is that, Flink will 
> not be able to deal with custom Yarn schedulers which override the 
> normalization behaviors.
> h3. Proposal for long term solution
> For long term, it would be good to use Yarn 
> ContainerRequest#allocationRequestId to match the allocated containers with 
> the original requests, so that Flink no longer needs to understand how Yarn 
> normalize container resources. 
> Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while 
> ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution 
> would not work at the moment.
> Another idea is to support various Hadoop versions with different container 
> matching logics. We can abstract the container matching logics into a 
> dedicating component, and provide different implementations for it. This will 
> allow Flink to take advantages of the new versions (e.g., work well with 
> custom schedulers), while stay compatible with the old versions without those 
> advantages.
> Given that we need the resource based matching anyway for the old Hadoop 
> versions, and the cost for maintaining two sets of matching logics, I tend to 
> think this approach as a back-up option to be worked on when we indeed see a 
> need for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used

2020-09-07 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191912#comment-17191912
 ] 

Xintong Song commented on FLINK-19151:
--

[~csbliss]

Sure. Thanks for the volunteering. I have assigned you.

Please make sure you read the community 
[contribution|https://flink.apache.org/contributing/contribute-code.html] and 
[code 
style|https://flink.apache.org/contributing/code-style-and-quality-preamble.html]
 guidelines in advance and go ahead.

> Flink does not normalize container resource with correct configurations when 
> Yarn FairScheduler is used 
> 
>
> Key: FLINK-19151
> URL: https://issues.apache.org/jira/browse/FLINK-19151
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.11.2
>Reporter: Xintong Song
>Assignee: jinhai
>Priority: Major
>
> h3. Problem
> It's a Yarn protocol that the requested container resource will be normalized 
> for allocation. That means, the allocated container may have different 
> resource (larger than or equal to) compared to what is requested.
> Currently, Flink matches the allocated containers to the original requests by 
> reading the Yarn configurations and calculate how the requested resources 
> should be normalized.
> What has been overlooked is that, Yarn FairScheduler (and its subclass 
> SLSFairScheduler) has overridden the normalization behavior. To be specific,
>  * By default, Yarn normalize container resources to integer multiple of 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
>  * FairScheduler normalize container resources to integer multiple of 
> "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the 
> deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while 
> making sure the resource is no less than 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
> h3. Proposal for short term solution
> To fix this problem, a quick and easy way is to also read Yarn configuration 
> and learn which scheduler is used, and perform normalization calculations 
> accordingly. This should be good enough to cover behaviors of all the 
> schedulers that Yarn currently provides. The limitation is that, Flink will 
> not be able to deal with custom Yarn schedulers which override the 
> normalization behaviors.
> h3. Proposal for long term solution
> For long term, it would be good to use Yarn 
> ContainerRequest#allocationRequestId to match the allocated containers with 
> the original requests, so that Flink no longer needs to understand how Yarn 
> normalize container resources. 
> Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while 
> ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution 
> would not work at the moment.
> Another idea is to support various Hadoop versions with different container 
> matching logics. We can abstract the container matching logics into a 
> dedicating component, and provide different implementations for it. This will 
> allow Flink to take advantages of the new versions (e.g., work well with 
> custom schedulers), while stay compatible with the old versions without those 
> advantages.
> Given that we need the resource based matching anyway for the old Hadoop 
> versions, and the cost for maintaining two sets of matching logics, I tend to 
> think this approach as a back-up option to be worked on when we indeed see a 
> need for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used

2020-09-07 Thread jinhai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191911#comment-17191911
 ] 

jinhai commented on FLINK-19151:


[~xintongsong] Can i have this issue?

> Flink does not normalize container resource with correct configurations when 
> Yarn FairScheduler is used 
> 
>
> Key: FLINK-19151
> URL: https://issues.apache.org/jira/browse/FLINK-19151
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.11.2
>Reporter: Xintong Song
>Priority: Major
>
> h3. Problem
> It's a Yarn protocol that the requested container resource will be normalized 
> for allocation. That means, the allocated container may have different 
> resource (larger than or equal to) compared to what is requested.
> Currently, Flink matches the allocated containers to the original requests by 
> reading the Yarn configurations and calculate how the requested resources 
> should be normalized.
> What has been overlooked is that, Yarn FairScheduler (and its subclass 
> SLSFairScheduler) has overridden the normalization behavior. To be specific,
>  * By default, Yarn normalize container resources to integer multiple of 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
>  * FairScheduler normalize container resources to integer multiple of 
> "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the 
> deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while 
> making sure the resource is no less than 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
> h3. Proposal for short term solution
> To fix this problem, a quick and easy way is to also read Yarn configuration 
> and learn which scheduler is used, and perform normalization calculations 
> accordingly. This should be good enough to cover behaviors of all the 
> schedulers that Yarn currently provides. The limitation is that, Flink will 
> not be able to deal with custom Yarn schedulers which override the 
> normalization behaviors.
> h3. Proposal for long term solution
> For long term, it would be good to use Yarn 
> ContainerRequest#allocationRequestId to match the allocated containers with 
> the original requests, so that Flink no longer needs to understand how Yarn 
> normalize container resources. 
> Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while 
> ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution 
> would not work at the moment.
> Another idea is to support various Hadoop versions with different container 
> matching logics. We can abstract the container matching logics into a 
> dedicating component, and provide different implementations for it. This will 
> allow Flink to take advantages of the new versions (e.g., work well with 
> custom schedulers), while stay compatible with the old versions without those 
> advantages.
> Given that we need the resource based matching anyway for the old Hadoop 
> versions, and the cost for maintaining two sets of matching logics, I tend to 
> think this approach as a back-up option to be worked on when we indeed see a 
> need for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used

2020-09-07 Thread jinhai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191637#comment-17191637
 ] 

jinhai commented on FLINK-19151:


[~xintongsong] Yes, we can set the unit resources in the constructor of 
YarnResourceManager

> Flink does not normalize container resource with correct configurations when 
> Yarn FairScheduler is used 
> 
>
> Key: FLINK-19151
> URL: https://issues.apache.org/jira/browse/FLINK-19151
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.11.2
>Reporter: Xintong Song
>Priority: Major
>
> h3. Problem
> It's a Yarn protocol that the requested container resource will be normalized 
> for allocation. That means, the allocated container may have different 
> resource (larger than or equal to) compared to what is requested.
> Currently, Flink matches the allocated containers to the original requests by 
> reading the Yarn configurations and calculate how the requested resources 
> should be normalized.
> What has been overlooked is that, Yarn FairScheduler (and its subclass 
> SLSFairScheduler) has overridden the normalization behavior. To be specific,
>  * By default, Yarn normalize container resources to integer multiple of 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
>  * FairScheduler normalize container resources to integer multiple of 
> "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the 
> deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while 
> making sure the resource is no less than 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
> h3. Proposal for short term solution
> To fix this problem, a quick and easy way is to also read Yarn configuration 
> and learn which scheduler is used, and perform normalization calculations 
> accordingly. This should be good enough to cover behaviors of all the 
> schedulers that Yarn currently provides. The limitation is that, Flink will 
> not be able to deal with custom Yarn schedulers which override the 
> normalization behaviors.
> h3. Proposal for long term solution
> For long term, it would be good to use Yarn 
> ContainerRequest#allocationRequestId to match the allocated containers with 
> the original requests, so that Flink no longer needs to understand how Yarn 
> normalize container resources. 
> Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while 
> ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution 
> would not work at the moment.
> Another idea is to support various Hadoop versions with different container 
> matching logics. We can abstract the container matching logics into a 
> dedicating component, and provide different implementations for it. This will 
> allow Flink to take advantages of the new versions (e.g., work well with 
> custom schedulers), while stay compatible with the old versions without those 
> advantages.
> Given that we need the resource based matching anyway for the old Hadoop 
> versions, and the cost for maintaining two sets of matching logics, I tend to 
> think this approach as a back-up option to be worked on when we indeed see a 
> need for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used

2020-09-07 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191633#comment-17191633
 ] 

Xintong Song commented on FLINK-19151:
--

+1 on adding unit memory/vcore in WorkerSpecContainerResourceAdapter and 
deciding the unit resources according to the Yarn scheduler.

I would suggest to decide the unit resources outside 
`WorkerSpecContainerResourceAdapter`. The adapter does not need to be aware of 
the Yarn scheduler differences. It should also make the adapter easy to test.

> Flink does not normalize container resource with correct configurations when 
> Yarn FairScheduler is used 
> 
>
> Key: FLINK-19151
> URL: https://issues.apache.org/jira/browse/FLINK-19151
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.11.2
>Reporter: Xintong Song
>Priority: Major
>
> h3. Problem
> It's a Yarn protocol that the requested container resource will be normalized 
> for allocation. That means, the allocated container may have different 
> resource (larger than or equal to) compared to what is requested.
> Currently, Flink matches the allocated containers to the original requests by 
> reading the Yarn configurations and calculate how the requested resources 
> should be normalized.
> What has been overlooked is that, Yarn FairScheduler (and its subclass 
> SLSFairScheduler) has overridden the normalization behavior. To be specific,
>  * By default, Yarn normalize container resources to integer multiple of 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
>  * FairScheduler normalize container resources to integer multiple of 
> "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the 
> deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while 
> making sure the resource is no less than 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
> h3. Proposal for short term solution
> To fix this problem, a quick and easy way is to also read Yarn configuration 
> and learn which scheduler is used, and perform normalization calculations 
> accordingly. This should be good enough to cover behaviors of all the 
> schedulers that Yarn currently provides. The limitation is that, Flink will 
> not be able to deal with custom Yarn schedulers which override the 
> normalization behaviors.
> h3. Proposal for long term solution
> For long term, it would be good to use Yarn 
> ContainerRequest#allocationRequestId to match the allocated containers with 
> the original requests, so that Flink no longer needs to understand how Yarn 
> normalize container resources. 
> Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while 
> ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution 
> would not work at the moment.
> Another idea is to support various Hadoop versions with different container 
> matching logics. We can abstract the container matching logics into a 
> dedicating component, and provide different implementations for it. This will 
> allow Flink to take advantages of the new versions (e.g., work well with 
> custom schedulers), while stay compatible with the old versions without those 
> advantages.
> Given that we need the resource based matching anyway for the old Hadoop 
> versions, and the cost for maintaining two sets of matching logics, I tend to 
> think this approach as a back-up option to be worked on when we indeed see a 
> need for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used

2020-09-07 Thread jinhai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191623#comment-17191623
 ] 

jinhai commented on FLINK-19151:


[~xintongsong]

Can we add two fields to the class WorkerSpecContainerResourceAdapter: 
unitMemMB and unitVcore.
Calculate the unit value in the WorkerSpecContainerResourceAdapter constructor 
according to different yarn scheduler, such as FairScheduler or other yarn 
scheduler

> Flink does not normalize container resource with correct configurations when 
> Yarn FairScheduler is used 
> 
>
> Key: FLINK-19151
> URL: https://issues.apache.org/jira/browse/FLINK-19151
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.11.2
>Reporter: Xintong Song
>Priority: Major
>
> h3. Problem
> It's a Yarn protocol that the requested container resource will be normalized 
> for allocation. That means, the allocated container may have different 
> resource (larger than or equal to) compared to what is requested.
> Currently, Flink matches the allocated containers to the original requests by 
> reading the Yarn configurations and calculate how the requested resources 
> should be normalized.
> What has been overlooked is that, Yarn FairScheduler (and its subclass 
> SLSFairScheduler) has overridden the normalization behavior. To be specific,
>  * By default, Yarn normalize container resources to integer multiple of 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
>  * FairScheduler normalize container resources to integer multiple of 
> "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the 
> deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while 
> making sure the resource is no less than 
> "yarn.scheduler.minimum-allocation-[mb|vcores]"
> h3. Proposal for short term solution
> To fix this problem, a quick and easy way is to also read Yarn configuration 
> and learn which scheduler is used, and perform normalization calculations 
> accordingly. This should be good enough to cover behaviors of all the 
> schedulers that Yarn currently provides. The limitation is that, Flink will 
> not be able to deal with custom Yarn schedulers which override the 
> normalization behaviors.
> h3. Proposal for long term solution
> For long term, it would be good to use Yarn 
> ContainerRequest#allocationRequestId to match the allocated containers with 
> the original requests, so that Flink no longer needs to understand how Yarn 
> normalize container resources. 
> Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while 
> ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution 
> would not work at the moment.
> Another idea is to support various Hadoop versions with different container 
> matching logics. We can abstract the container matching logics into a 
> dedicating component, and provide different implementations for it. This will 
> allow Flink to take advantages of the new versions (e.g., work well with 
> custom schedulers), while stay compatible with the old versions without those 
> advantages.
> Given that we need the resource based matching anyway for the old Hadoop 
> versions, and the cost for maintaining two sets of matching logics, I tend to 
> think this approach as a back-up option to be worked on when we indeed see a 
> need for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)