[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used
[ https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191985#comment-17191985 ] Yang Wang commented on FLINK-19151: --- [~xintongsong] Thanks for the explanation. I agree with you that we could start with the short-term solution. Since it do not need too much efforts and could fix the issues. > Flink does not normalize container resource with correct configurations when > Yarn FairScheduler is used > > > Key: FLINK-19151 > URL: https://issues.apache.org/jira/browse/FLINK-19151 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.11.2 >Reporter: Xintong Song >Assignee: jinhai >Priority: Major > Labels: pull-request-available > > h3. Problem > It's a Yarn protocol that the requested container resource will be normalized > for allocation. That means, the allocated container may have different > resource (larger than or equal to) compared to what is requested. > Currently, Flink matches the allocated containers to the original requests by > reading the Yarn configurations and calculate how the requested resources > should be normalized. > What has been overlooked is that, Yarn FairScheduler (and its subclass > SLSFairScheduler) has overridden the normalization behavior. To be specific, > * By default, Yarn normalize container resources to integer multiple of > "yarn.scheduler.minimum-allocation-[mb|vcores]" > * FairScheduler normalize container resources to integer multiple of > "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the > deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while > making sure the resource is no less than > "yarn.scheduler.minimum-allocation-[mb|vcores]" > h3. Proposal for short term solution > To fix this problem, a quick and easy way is to also read Yarn configuration > and learn which scheduler is used, and perform normalization calculations > accordingly. This should be good enough to cover behaviors of all the > schedulers that Yarn currently provides. The limitation is that, Flink will > not be able to deal with custom Yarn schedulers which override the > normalization behaviors. > h3. Proposal for long term solution > For long term, it would be good to use Yarn > ContainerRequest#allocationRequestId to match the allocated containers with > the original requests, so that Flink no longer needs to understand how Yarn > normalize container resources. > Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while > ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution > would not work at the moment. > Another idea is to support various Hadoop versions with different container > matching logics. We can abstract the container matching logics into a > dedicating component, and provide different implementations for it. This will > allow Flink to take advantages of the new versions (e.g., work well with > custom schedulers), while stay compatible with the old versions without those > advantages. > Given that we need the resource based matching anyway for the old Hadoop > versions, and the cost for maintaining two sets of matching logics, I tend to > think this approach as a back-up option to be worked on when we indeed see a > need for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used
[ https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191956#comment-17191956 ] Xintong Song commented on FLINK-19151: -- Thanks for the input, [~fly_in_gis]. I think how do we support various hadoop versions w.r.t. the API and feature differences is a big topic, which probably deserve a separate discussion thread. In addition to the container allocation request id, currently in Flink there are also other usages of Yarn APIs/features that are not supported by all hadoop versions. E.g., getting previous attempt containers and scheduler resource types from register application response, requesting containers with external resources (a.t.m. GPU), submitting application with specific tags and node labels, e.t.c. I think it would be better to take all these issues into consideration, and try to come up with some general principles how we handle such version diversities. Such discussion could take some time, at mean time it might be good to fix the problem with the proposed short-term solution, which should not require much efforts. WDYT? > Flink does not normalize container resource with correct configurations when > Yarn FairScheduler is used > > > Key: FLINK-19151 > URL: https://issues.apache.org/jira/browse/FLINK-19151 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.11.2 >Reporter: Xintong Song >Assignee: jinhai >Priority: Major > Labels: pull-request-available > > h3. Problem > It's a Yarn protocol that the requested container resource will be normalized > for allocation. That means, the allocated container may have different > resource (larger than or equal to) compared to what is requested. > Currently, Flink matches the allocated containers to the original requests by > reading the Yarn configurations and calculate how the requested resources > should be normalized. > What has been overlooked is that, Yarn FairScheduler (and its subclass > SLSFairScheduler) has overridden the normalization behavior. To be specific, > * By default, Yarn normalize container resources to integer multiple of > "yarn.scheduler.minimum-allocation-[mb|vcores]" > * FairScheduler normalize container resources to integer multiple of > "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the > deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while > making sure the resource is no less than > "yarn.scheduler.minimum-allocation-[mb|vcores]" > h3. Proposal for short term solution > To fix this problem, a quick and easy way is to also read Yarn configuration > and learn which scheduler is used, and perform normalization calculations > accordingly. This should be good enough to cover behaviors of all the > schedulers that Yarn currently provides. The limitation is that, Flink will > not be able to deal with custom Yarn schedulers which override the > normalization behaviors. > h3. Proposal for long term solution > For long term, it would be good to use Yarn > ContainerRequest#allocationRequestId to match the allocated containers with > the original requests, so that Flink no longer needs to understand how Yarn > normalize container resources. > Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while > ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution > would not work at the moment. > Another idea is to support various Hadoop versions with different container > matching logics. We can abstract the container matching logics into a > dedicating component, and provide different implementations for it. This will > allow Flink to take advantages of the new versions (e.g., work well with > custom schedulers), while stay compatible with the old versions without those > advantages. > Given that we need the resource based matching anyway for the old Hadoop > versions, and the cost for maintaining two sets of matching logics, I tend to > think this approach as a back-up option to be worked on when we indeed see a > need for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used
[ https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191917#comment-17191917 ] Yang Wang commented on FLINK-19151: --- I prefer the long term solution and +1 for the different implementations based on hadoop versions. Currently, many hadoop versions are going to EOL, including [2.0.x - 2.9.x], [3.0.x]. And the community strongly suggests to upgrade to hadoop-3.1. It should be stable enough. [https://cwiki.apache.org/confluence/display/HADOOP/EOL+(End-of-life)+Release+Branches] > Flink does not normalize container resource with correct configurations when > Yarn FairScheduler is used > > > Key: FLINK-19151 > URL: https://issues.apache.org/jira/browse/FLINK-19151 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.11.2 >Reporter: Xintong Song >Assignee: jinhai >Priority: Major > Labels: pull-request-available > > h3. Problem > It's a Yarn protocol that the requested container resource will be normalized > for allocation. That means, the allocated container may have different > resource (larger than or equal to) compared to what is requested. > Currently, Flink matches the allocated containers to the original requests by > reading the Yarn configurations and calculate how the requested resources > should be normalized. > What has been overlooked is that, Yarn FairScheduler (and its subclass > SLSFairScheduler) has overridden the normalization behavior. To be specific, > * By default, Yarn normalize container resources to integer multiple of > "yarn.scheduler.minimum-allocation-[mb|vcores]" > * FairScheduler normalize container resources to integer multiple of > "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the > deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while > making sure the resource is no less than > "yarn.scheduler.minimum-allocation-[mb|vcores]" > h3. Proposal for short term solution > To fix this problem, a quick and easy way is to also read Yarn configuration > and learn which scheduler is used, and perform normalization calculations > accordingly. This should be good enough to cover behaviors of all the > schedulers that Yarn currently provides. The limitation is that, Flink will > not be able to deal with custom Yarn schedulers which override the > normalization behaviors. > h3. Proposal for long term solution > For long term, it would be good to use Yarn > ContainerRequest#allocationRequestId to match the allocated containers with > the original requests, so that Flink no longer needs to understand how Yarn > normalize container resources. > Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while > ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution > would not work at the moment. > Another idea is to support various Hadoop versions with different container > matching logics. We can abstract the container matching logics into a > dedicating component, and provide different implementations for it. This will > allow Flink to take advantages of the new versions (e.g., work well with > custom schedulers), while stay compatible with the old versions without those > advantages. > Given that we need the resource based matching anyway for the old Hadoop > versions, and the cost for maintaining two sets of matching logics, I tend to > think this approach as a back-up option to be worked on when we indeed see a > need for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used
[ https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191912#comment-17191912 ] Xintong Song commented on FLINK-19151: -- [~csbliss] Sure. Thanks for the volunteering. I have assigned you. Please make sure you read the community [contribution|https://flink.apache.org/contributing/contribute-code.html] and [code style|https://flink.apache.org/contributing/code-style-and-quality-preamble.html] guidelines in advance and go ahead. > Flink does not normalize container resource with correct configurations when > Yarn FairScheduler is used > > > Key: FLINK-19151 > URL: https://issues.apache.org/jira/browse/FLINK-19151 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.11.2 >Reporter: Xintong Song >Assignee: jinhai >Priority: Major > > h3. Problem > It's a Yarn protocol that the requested container resource will be normalized > for allocation. That means, the allocated container may have different > resource (larger than or equal to) compared to what is requested. > Currently, Flink matches the allocated containers to the original requests by > reading the Yarn configurations and calculate how the requested resources > should be normalized. > What has been overlooked is that, Yarn FairScheduler (and its subclass > SLSFairScheduler) has overridden the normalization behavior. To be specific, > * By default, Yarn normalize container resources to integer multiple of > "yarn.scheduler.minimum-allocation-[mb|vcores]" > * FairScheduler normalize container resources to integer multiple of > "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the > deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while > making sure the resource is no less than > "yarn.scheduler.minimum-allocation-[mb|vcores]" > h3. Proposal for short term solution > To fix this problem, a quick and easy way is to also read Yarn configuration > and learn which scheduler is used, and perform normalization calculations > accordingly. This should be good enough to cover behaviors of all the > schedulers that Yarn currently provides. The limitation is that, Flink will > not be able to deal with custom Yarn schedulers which override the > normalization behaviors. > h3. Proposal for long term solution > For long term, it would be good to use Yarn > ContainerRequest#allocationRequestId to match the allocated containers with > the original requests, so that Flink no longer needs to understand how Yarn > normalize container resources. > Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while > ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution > would not work at the moment. > Another idea is to support various Hadoop versions with different container > matching logics. We can abstract the container matching logics into a > dedicating component, and provide different implementations for it. This will > allow Flink to take advantages of the new versions (e.g., work well with > custom schedulers), while stay compatible with the old versions without those > advantages. > Given that we need the resource based matching anyway for the old Hadoop > versions, and the cost for maintaining two sets of matching logics, I tend to > think this approach as a back-up option to be worked on when we indeed see a > need for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used
[ https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191911#comment-17191911 ] jinhai commented on FLINK-19151: [~xintongsong] Can i have this issue? > Flink does not normalize container resource with correct configurations when > Yarn FairScheduler is used > > > Key: FLINK-19151 > URL: https://issues.apache.org/jira/browse/FLINK-19151 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.11.2 >Reporter: Xintong Song >Priority: Major > > h3. Problem > It's a Yarn protocol that the requested container resource will be normalized > for allocation. That means, the allocated container may have different > resource (larger than or equal to) compared to what is requested. > Currently, Flink matches the allocated containers to the original requests by > reading the Yarn configurations and calculate how the requested resources > should be normalized. > What has been overlooked is that, Yarn FairScheduler (and its subclass > SLSFairScheduler) has overridden the normalization behavior. To be specific, > * By default, Yarn normalize container resources to integer multiple of > "yarn.scheduler.minimum-allocation-[mb|vcores]" > * FairScheduler normalize container resources to integer multiple of > "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the > deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while > making sure the resource is no less than > "yarn.scheduler.minimum-allocation-[mb|vcores]" > h3. Proposal for short term solution > To fix this problem, a quick and easy way is to also read Yarn configuration > and learn which scheduler is used, and perform normalization calculations > accordingly. This should be good enough to cover behaviors of all the > schedulers that Yarn currently provides. The limitation is that, Flink will > not be able to deal with custom Yarn schedulers which override the > normalization behaviors. > h3. Proposal for long term solution > For long term, it would be good to use Yarn > ContainerRequest#allocationRequestId to match the allocated containers with > the original requests, so that Flink no longer needs to understand how Yarn > normalize container resources. > Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while > ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution > would not work at the moment. > Another idea is to support various Hadoop versions with different container > matching logics. We can abstract the container matching logics into a > dedicating component, and provide different implementations for it. This will > allow Flink to take advantages of the new versions (e.g., work well with > custom schedulers), while stay compatible with the old versions without those > advantages. > Given that we need the resource based matching anyway for the old Hadoop > versions, and the cost for maintaining two sets of matching logics, I tend to > think this approach as a back-up option to be worked on when we indeed see a > need for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used
[ https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191637#comment-17191637 ] jinhai commented on FLINK-19151: [~xintongsong] Yes, we can set the unit resources in the constructor of YarnResourceManager > Flink does not normalize container resource with correct configurations when > Yarn FairScheduler is used > > > Key: FLINK-19151 > URL: https://issues.apache.org/jira/browse/FLINK-19151 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.11.2 >Reporter: Xintong Song >Priority: Major > > h3. Problem > It's a Yarn protocol that the requested container resource will be normalized > for allocation. That means, the allocated container may have different > resource (larger than or equal to) compared to what is requested. > Currently, Flink matches the allocated containers to the original requests by > reading the Yarn configurations and calculate how the requested resources > should be normalized. > What has been overlooked is that, Yarn FairScheduler (and its subclass > SLSFairScheduler) has overridden the normalization behavior. To be specific, > * By default, Yarn normalize container resources to integer multiple of > "yarn.scheduler.minimum-allocation-[mb|vcores]" > * FairScheduler normalize container resources to integer multiple of > "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the > deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while > making sure the resource is no less than > "yarn.scheduler.minimum-allocation-[mb|vcores]" > h3. Proposal for short term solution > To fix this problem, a quick and easy way is to also read Yarn configuration > and learn which scheduler is used, and perform normalization calculations > accordingly. This should be good enough to cover behaviors of all the > schedulers that Yarn currently provides. The limitation is that, Flink will > not be able to deal with custom Yarn schedulers which override the > normalization behaviors. > h3. Proposal for long term solution > For long term, it would be good to use Yarn > ContainerRequest#allocationRequestId to match the allocated containers with > the original requests, so that Flink no longer needs to understand how Yarn > normalize container resources. > Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while > ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution > would not work at the moment. > Another idea is to support various Hadoop versions with different container > matching logics. We can abstract the container matching logics into a > dedicating component, and provide different implementations for it. This will > allow Flink to take advantages of the new versions (e.g., work well with > custom schedulers), while stay compatible with the old versions without those > advantages. > Given that we need the resource based matching anyway for the old Hadoop > versions, and the cost for maintaining two sets of matching logics, I tend to > think this approach as a back-up option to be worked on when we indeed see a > need for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used
[ https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191633#comment-17191633 ] Xintong Song commented on FLINK-19151: -- +1 on adding unit memory/vcore in WorkerSpecContainerResourceAdapter and deciding the unit resources according to the Yarn scheduler. I would suggest to decide the unit resources outside `WorkerSpecContainerResourceAdapter`. The adapter does not need to be aware of the Yarn scheduler differences. It should also make the adapter easy to test. > Flink does not normalize container resource with correct configurations when > Yarn FairScheduler is used > > > Key: FLINK-19151 > URL: https://issues.apache.org/jira/browse/FLINK-19151 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.11.2 >Reporter: Xintong Song >Priority: Major > > h3. Problem > It's a Yarn protocol that the requested container resource will be normalized > for allocation. That means, the allocated container may have different > resource (larger than or equal to) compared to what is requested. > Currently, Flink matches the allocated containers to the original requests by > reading the Yarn configurations and calculate how the requested resources > should be normalized. > What has been overlooked is that, Yarn FairScheduler (and its subclass > SLSFairScheduler) has overridden the normalization behavior. To be specific, > * By default, Yarn normalize container resources to integer multiple of > "yarn.scheduler.minimum-allocation-[mb|vcores]" > * FairScheduler normalize container resources to integer multiple of > "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the > deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while > making sure the resource is no less than > "yarn.scheduler.minimum-allocation-[mb|vcores]" > h3. Proposal for short term solution > To fix this problem, a quick and easy way is to also read Yarn configuration > and learn which scheduler is used, and perform normalization calculations > accordingly. This should be good enough to cover behaviors of all the > schedulers that Yarn currently provides. The limitation is that, Flink will > not be able to deal with custom Yarn schedulers which override the > normalization behaviors. > h3. Proposal for long term solution > For long term, it would be good to use Yarn > ContainerRequest#allocationRequestId to match the allocated containers with > the original requests, so that Flink no longer needs to understand how Yarn > normalize container resources. > Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while > ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution > would not work at the moment. > Another idea is to support various Hadoop versions with different container > matching logics. We can abstract the container matching logics into a > dedicating component, and provide different implementations for it. This will > allow Flink to take advantages of the new versions (e.g., work well with > custom schedulers), while stay compatible with the old versions without those > advantages. > Given that we need the resource based matching anyway for the old Hadoop > versions, and the cost for maintaining two sets of matching logics, I tend to > think this approach as a back-up option to be worked on when we indeed see a > need for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19151) Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used
[ https://issues.apache.org/jira/browse/FLINK-19151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191623#comment-17191623 ] jinhai commented on FLINK-19151: [~xintongsong] Can we add two fields to the class WorkerSpecContainerResourceAdapter: unitMemMB and unitVcore. Calculate the unit value in the WorkerSpecContainerResourceAdapter constructor according to different yarn scheduler, such as FairScheduler or other yarn scheduler > Flink does not normalize container resource with correct configurations when > Yarn FairScheduler is used > > > Key: FLINK-19151 > URL: https://issues.apache.org/jira/browse/FLINK-19151 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.11.2 >Reporter: Xintong Song >Priority: Major > > h3. Problem > It's a Yarn protocol that the requested container resource will be normalized > for allocation. That means, the allocated container may have different > resource (larger than or equal to) compared to what is requested. > Currently, Flink matches the allocated containers to the original requests by > reading the Yarn configurations and calculate how the requested resources > should be normalized. > What has been overlooked is that, Yarn FairScheduler (and its subclass > SLSFairScheduler) has overridden the normalization behavior. To be specific, > * By default, Yarn normalize container resources to integer multiple of > "yarn.scheduler.minimum-allocation-[mb|vcores]" > * FairScheduler normalize container resources to integer multiple of > "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the > deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while > making sure the resource is no less than > "yarn.scheduler.minimum-allocation-[mb|vcores]" > h3. Proposal for short term solution > To fix this problem, a quick and easy way is to also read Yarn configuration > and learn which scheduler is used, and perform normalization calculations > accordingly. This should be good enough to cover behaviors of all the > schedulers that Yarn currently provides. The limitation is that, Flink will > not be able to deal with custom Yarn schedulers which override the > normalization behaviors. > h3. Proposal for long term solution > For long term, it would be good to use Yarn > ContainerRequest#allocationRequestId to match the allocated containers with > the original requests, so that Flink no longer needs to understand how Yarn > normalize container resources. > Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while > ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution > would not work at the moment. > Another idea is to support various Hadoop versions with different container > matching logics. We can abstract the container matching logics into a > dedicating component, and provide different implementations for it. This will > allow Flink to take advantages of the new versions (e.g., work well with > custom schedulers), while stay compatible with the old versions without those > advantages. > Given that we need the resource based matching anyway for the old Hadoop > versions, and the cost for maintaining two sets of matching logics, I tend to > think this approach as a back-up option to be worked on when we indeed see a > need for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)