YARN-6599. Support anti-affinity constraint via AppPlacementAllocator. (Wangda Tan via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b82addc0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b82addc0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b82addc0 Branch: refs/heads/YARN-6592 Commit: b82addc0a51050accba4b705a613dc274e9ac90a Parents: 12d4e3b Author: Arun Suresh <asur...@apache.org> Authored: Thu Jan 18 14:10:30 2018 -0800 Committer: Arun Suresh <asur...@apache.org> Committed: Thu Jan 18 14:10:30 2018 -0800 ---------------------------------------------------------------------- .../v2/app/rm/TestRMContainerAllocator.java | 15 +- .../sls/scheduler/SLSCapacityScheduler.java | 15 +- .../yarn/sls/scheduler/SLSFairScheduler.java | 12 +- .../dev-support/findbugs-exclude.xml | 8 + .../yarn/api/resource/PlacementConstraints.java | 43 +- .../hadoop/yarn/conf/YarnConfiguration.java | 2 +- ...SchedulerInvalidResoureRequestException.java | 47 ++ .../api/impl/TestAMRMClientOnRMRestart.java | 9 +- .../impl/pb/AllocateRequestPBImpl.java | 1 + .../server/scheduler/SchedulerRequestKey.java | 11 + .../resourcemanager/DefaultAMSProcessor.java | 13 +- .../rmapp/attempt/RMAppAttemptImpl.java | 5 +- .../scheduler/AbstractYarnScheduler.java | 3 +- .../scheduler/AppSchedulingInfo.java | 205 +++++-- .../ApplicationPlacementAllocatorFactory.java | 68 +++ .../scheduler/ApplicationPlacementFactory.java | 63 --- .../scheduler/ContainerUpdateContext.java | 4 +- .../scheduler/SchedulerApplicationAttempt.java | 20 +- .../scheduler/YarnScheduler.java | 15 +- .../scheduler/capacity/CapacityScheduler.java | 54 +- .../CapacitySchedulerConfiguration.java | 5 + .../allocator/RegularContainerAllocator.java | 3 +- .../scheduler/common/ContainerRequest.java | 12 + .../scheduler/common/PendingAsk.java | 6 + .../scheduler/common/fica/FiCaSchedulerApp.java | 6 + .../constraint/AllocationTagsManager.java | 71 +-- .../constraint/AllocationTagsNamespaces.java | 31 -- .../constraint/PlacementConstraintsUtil.java | 165 ++++-- .../algorithm/DefaultPlacementAlgorithm.java | 2 +- .../processor/PlacementProcessor.java | 8 +- .../scheduler/fair/FairScheduler.java | 12 +- .../scheduler/fifo/FifoScheduler.java | 7 +- .../placement/AppPlacementAllocator.java | 66 ++- .../LocalityAppPlacementAllocator.java | 35 +- .../SingleConstraintAppPlacementAllocator.java | 531 +++++++++++++++++++ .../server/resourcemanager/Application.java | 9 +- .../yarn/server/resourcemanager/MockAM.java | 51 ++ .../attempt/TestRMAppAttemptTransitions.java | 10 +- .../rmcontainer/TestRMContainerImpl.java | 6 +- .../scheduler/TestAppSchedulingInfo.java | 4 +- .../capacity/CapacitySchedulerTestBase.java | 79 +++ .../capacity/TestCapacityScheduler.java | 90 +--- .../TestCapacitySchedulerAsyncScheduling.java | 2 +- .../TestCapacitySchedulerAutoQueueCreation.java | 2 +- ...apacitySchedulerSchedulingRequestUpdate.java | 260 +++++++++ .../capacity/TestIncreaseAllocationExpirer.java | 2 +- ...estSchedulingRequestContainerAllocation.java | 277 ++++++++++ ...hedulingRequestContainerAllocationAsync.java | 139 +++++ .../scheduler/capacity/TestUtils.java | 2 + .../constraint/TestAllocationTagsManager.java | 30 +- .../TestPlacementConstraintsUtil.java | 36 +- .../scheduler/fair/FairSchedulerTestBase.java | 6 +- .../fair/TestContinuousScheduling.java | 10 +- .../scheduler/fair/TestFairScheduler.java | 30 +- .../scheduler/fifo/TestFifoScheduler.java | 28 +- ...stSingleConstraintAppPlacementAllocator.java | 403 ++++++++++++++ 56 files changed, 2557 insertions(+), 492 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 85e4181..7875917 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -1751,6 +1752,7 @@ public class TestRMContainerAllocator { super(); try { Configuration conf = new Configuration(); + init(conf); reinitialize(conf, rmContext); } catch (IOException ie) { LOG.info("add application failed with ", ie); @@ -1769,8 +1771,8 @@ public class TestRMContainerAllocator { @Override public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, - List<ContainerId> release, List<String> blacklistAdditions, - List<String> blacklistRemovals, + List<SchedulingRequest> schedulingRequests, List<ContainerId> release, + List<String> blacklistAdditions, List<String> blacklistRemovals, ContainerUpdates updateRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { @@ -1785,7 +1787,7 @@ public class TestRMContainerAllocator { lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; Allocation allocation = super.allocate( - applicationAttemptId, askCopy, release, blacklistAdditions, + applicationAttemptId, askCopy, schedulingRequests, release, blacklistAdditions, blacklistRemovals, updateRequests); if (forceResourceLimit != null) { // Test wants to force the non-default resource limit @@ -1805,6 +1807,7 @@ public class TestRMContainerAllocator { super(); try { Configuration conf = new Configuration(); + init(conf); reinitialize(conf, rmContext); } catch (IOException ie) { LOG.info("add application failed with ", ie); @@ -1815,8 +1818,8 @@ public class TestRMContainerAllocator { @Override public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, - List<ContainerId> release, List<String> blacklistAdditions, - List<String> blacklistRemovals, + List<SchedulingRequest> schedulingRequests, List<ContainerId> release, + List<String> blacklistAdditions, List<String> blacklistRemovals, ContainerUpdates updateRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { @@ -1827,7 +1830,7 @@ public class TestRMContainerAllocator { } SecurityUtil.setTokenServiceUseIp(false); Allocation normalAlloc = super.allocate( - applicationAttemptId, askCopy, release, + applicationAttemptId, askCopy, schedulingRequests, release, blacklistAdditions, blacklistRemovals, updateRequests); List<Container> containers = normalAlloc.getContainers(); if(containers.size() > 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 6848b22..35f3ed1 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -42,9 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -100,16 +99,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements @Override public Allocation allocate(ApplicationAttemptId attemptId, - List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, - List<String> strings, List<String> strings2, - ContainerUpdates updateRequests) { + List<ResourceRequest> resourceRequests, + List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds, + List<String> strings, List<String> strings2, ContainerUpdates updateRequests) { if (metricsON) { final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() .time(); Allocation allocation = null; try { allocation = super - .allocate(attemptId, resourceRequests, containerIds, strings, + .allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, strings, strings2, updateRequests); return allocation; } finally { @@ -123,7 +123,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } } } else { - return super.allocate(attemptId, resourceRequests, containerIds, strings, + return super.allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, strings, strings2, updateRequests); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java index 8e49c51..c27ab3e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -39,8 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; @@ -94,7 +93,8 @@ public class SLSFairScheduler extends FairScheduler @Override public Allocation allocate(ApplicationAttemptId attemptId, - List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, + List<ResourceRequest> resourceRequests, + List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds, List<String> blacklistAdditions, List<String> blacklistRemovals, ContainerUpdates updateRequests) { if (metricsON) { @@ -102,7 +102,8 @@ public class SLSFairScheduler extends FairScheduler .time(); Allocation allocation = null; try { - allocation = super.allocate(attemptId, resourceRequests, containerIds, + allocation = super.allocate(attemptId, resourceRequests, + schedulingRequests, containerIds, blacklistAdditions, blacklistRemovals, updateRequests); return allocation; } finally { @@ -116,7 +117,8 @@ public class SLSFairScheduler extends FairScheduler } } } else { - return super.allocate(attemptId, resourceRequests, containerIds, + return super.allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, blacklistAdditions, blacklistRemovals, updateRequests); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 6a10312..81b8825 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -650,4 +650,12 @@ <Method name="equals" /> <Bug pattern="EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC" /> </Match> + + <!-- Null pointer exception needs to be ignored here as Findbugs doesn't properly detect code logic --> + <Match> + <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SingleConstraintAppPlacementAllocator" /> + <Method name="validateAndSetSchedulingRequest" /> + <Bug pattern="NP_NULL_ON_SOME_PATH" /> + </Match> + </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java index c8991cb..ba1beae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java @@ -20,8 +20,12 @@ package org.apache.hadoop.yarn.api.resource; import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr; @@ -47,6 +51,14 @@ public final class PlacementConstraints { public static final String NODE = PlacementConstraint.NODE_SCOPE; public static final String RACK = PlacementConstraint.RACK_SCOPE; + public static final String NODE_PARTITION = "yarn_node_partition/"; + + private static final String APPLICATION_LABEL_PREFIX = + "yarn_application_label/"; + + @InterfaceAudience.Private + public static final String APPLICATION_LABEL_INTRA_APPLICATION = + APPLICATION_LABEL_PREFIX + "%intra_app%"; /** * Creates a constraint that requires allocations to be placed on nodes that @@ -187,6 +199,20 @@ public final class PlacementConstraints { } /** + * Constructs a target expression on a node partition. It is satisfied if + * the specified node partition has one of the specified nodePartitions + * + * @param nodePartitions the set of values that the attribute should take + * values from + * @return the resulting expression on the node attribute + */ + public static TargetExpression nodePartition( + String... nodePartitions) { + return new TargetExpression(TargetType.NODE_ATTRIBUTE, NODE_PARTITION, + nodePartitions); + } + + /** * Constructs a target expression on an allocation tag. It is satisfied if * the there are allocations with one of the given tags. * @@ -198,6 +224,22 @@ public final class PlacementConstraints { return new TargetExpression(TargetType.ALLOCATION_TAG, null, allocationTags); } + + /** + * Constructs a target expression on an allocation tag. It is satisfied if + * the there are allocations with one of the given tags. Comparing to + * {@link PlacementTargets#allocationTag(String...)}, this only check tags + * within the application. + * + * @param allocationTags the set of tags that the attribute should take + * values from + * @return the resulting expression on the allocation tags + */ + public static TargetExpression allocationTagToIntraApp( + String... allocationTags) { + return new TargetExpression(TargetType.ALLOCATION_TAG, + APPLICATION_LABEL_INTRA_APPLICATION, allocationTags); + } } // Creation of compound constraints. @@ -277,5 +319,4 @@ public final class PlacementConstraints { public static PlacementConstraint build(AbstractConstraint constraintExpr) { return constraintExpr.build(); } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index af83d8d..ea8f367 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -541,7 +541,7 @@ public class YarnConfiguration extends Configuration { public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED = RM_PREFIX + "placement-constraints.enabled"; - public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = true; + public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = false; public static final String RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS = RM_PREFIX + "placement-constraints.retry-attempts"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java new file mode 100644 index 0000000..f55ad83 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This exception is thrown when any issue inside scheduler to handle a new or + * updated {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}/ + * {@link org.apache.hadoop.yarn.api.records.ResourceRequest} add to the + * scheduler. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class SchedulerInvalidResoureRequestException extends YarnRuntimeException { + private static final long serialVersionUID = 10081123982L; + + public SchedulerInvalidResoureRequestException(String message) { + super(message); + } + + public SchedulerInvalidResoureRequestException(Throwable cause) { + super(cause); + } + + public SchedulerInvalidResoureRequestException(String message, + Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index 337d7d4..11d703d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -545,6 +546,7 @@ public class TestAMRMClientOnRMRestart { super(); try { Configuration conf = new Configuration(); + init(conf); reinitialize(conf, rmContext); } catch (IOException ie) { assert (false); @@ -563,8 +565,8 @@ public class TestAMRMClientOnRMRestart { @Override public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, - List<ContainerId> release, List<String> blacklistAdditions, - List<String> blacklistRemovals, + List<SchedulingRequest> schedulingRequests, List<ContainerId> release, + List<String> blacklistAdditions, List<String> blacklistRemovals, ContainerUpdates updateRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { @@ -580,7 +582,8 @@ public class TestAMRMClientOnRMRestart { lastDecrease = updateRequests.getDecreaseRequests(); lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; - return super.allocate(applicationAttemptId, askCopy, release, + return super.allocate(applicationAttemptId, askCopy, schedulingRequests, + release, blacklistAdditions, blacklistRemovals, updateRequests); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index b460044..50672a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -194,6 +194,7 @@ public class AllocateRequestPBImpl extends AllocateRequest { public void setSchedulingRequests( List<SchedulingRequest> schedulingRequests) { if (schedulingRequests == null) { + builder.clearSchedulingRequests(); return; } initSchedulingRequests(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java index c4f37f6..0fce083 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; /** @@ -45,6 +46,16 @@ public final class SchedulerRequestKey implements req.getAllocationRequestId(), null); } + /** + * Factory method to generate a SchedulerRequestKey from a SchedulingRequest. + * @param req SchedulingRequest + * @return SchedulerRequestKey + */ + public static SchedulerRequestKey create(SchedulingRequest req) { + return new SchedulerRequestKey(req.getPriority(), + req.getAllocationRequestId(), null); + } + public static SchedulerRequestKey create(UpdateContainerRequest req, SchedulerRequestKey schedulerRequestKey) { return new SchedulerRequestKey(schedulerRequestKey.getPriority(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 713947f..18ab473 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -273,10 +274,14 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { " state, ignore container allocate request."); allocation = EMPTY_ALLOCATION; } else { - allocation = - getScheduler().allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals, - containerUpdateRequests); + try { + allocation = getScheduler().allocate(appAttemptId, ask, + request.getSchedulingRequests(), release, + blacklistAdditions, blacklistRemovals, containerUpdateRequests); + } catch (SchedulerInvalidResoureRequestException e) { + LOG.warn("Exceptions caught when scheduler handling requests"); + throw new YarnException(e); + } } if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index cf10be4..8c2f4e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1113,8 +1113,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { Allocation amContainerAllocation = appAttempt.scheduler.allocate( appAttempt.applicationAttemptId, - appAttempt.amReqs, - EMPTY_CONTAINER_RELEASE_LIST, + appAttempt.amReqs, null, EMPTY_CONTAINER_RELEASE_LIST, amBlacklist.getBlacklistAdditions(), amBlacklist.getBlacklistRemovals(), new ContainerUpdates()); @@ -1140,7 +1139,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { // Acquire the AM container from the scheduler. Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, - EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, + EMPTY_CONTAINER_REQUEST_LIST, null, EMPTY_CONTAINER_RELEASE_LIST, null, null, new ContainerUpdates()); // There must be at least one container allocated, because a // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 74456f6..e3914c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; @@ -1151,7 +1152,7 @@ public abstract class AbstractYarnScheduler * * @param asks resource requests */ - protected void normalizeRequests(List<ResourceRequest> asks) { + protected void normalizeResourceRequests(List<ResourceRequest> asks) { for (ResourceRequest ask: asks) { ask.setCapability(getNormalizedResource(ask.getCapability())); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 8858d3b..7d6f233 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -49,7 +51,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.Applicatio import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SingleConstraintAppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -91,11 +95,12 @@ public class AppSchedulingInfo { public final ContainerUpdateContext updateContext; public final Map<String, String> applicationSchedulingEnvs = new HashMap<>(); + private final RMContext rmContext; public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, AbstractUsersManager abstractUsersManager, long epoch, ResourceUsage appResourceUsage, - Map<String, String> applicationSchedulingEnvs) { + Map<String, String> applicationSchedulingEnvs, RMContext rmContext) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; @@ -105,6 +110,7 @@ public class AppSchedulingInfo { epoch << ResourceManager.EPOCH_BIT_SHIFT); this.appResourceUsage = appResourceUsage; this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs); + this.rmContext = rmContext; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); updateContext = new ContainerUpdateContext(this); @@ -163,74 +169,153 @@ public class AppSchedulingInfo { * application, by asking for more resources and releasing resources acquired * by the application. * - * @param requests - * resources to be acquired + * @param resourceRequests resource requests to be allocated * @param recoverPreemptedRequestForAContainer - * recover ResourceRequest on preemption + * recover ResourceRequest/SchedulingRequest on preemption * @return true if any resource was updated, false otherwise */ - public boolean updateResourceRequests(List<ResourceRequest> requests, + public boolean updateResourceRequests(List<ResourceRequest> resourceRequests, boolean recoverPreemptedRequestForAContainer) { - if (null == requests || requests.isEmpty()) { - return false; + // Flag to track if any incoming requests update "ANY" requests + boolean offswitchResourcesUpdated; + + writeLock.lock(); + try { + // Update AppPlacementAllocator by requests + offswitchResourcesUpdated = internalAddResourceRequests( + recoverPreemptedRequestForAContainer, resourceRequests); + } finally { + writeLock.unlock(); } + return offswitchResourcesUpdated; + } + + /** + * The ApplicationMaster is updating resource requirements for the + * application, by asking for more resources and releasing resources acquired + * by the application. + * + * @param dedupRequests (dedup) resource requests to be allocated + * @param recoverPreemptedRequestForAContainer + * recover ResourceRequest/SchedulingRequest on preemption + * @return true if any resource was updated, false otherwise + */ + public boolean updateResourceRequests( + Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests, + boolean recoverPreemptedRequestForAContainer) { // Flag to track if any incoming requests update "ANY" requests - boolean offswitchResourcesUpdated = false; + boolean offswitchResourcesUpdated; + writeLock.lock(); try { - this.writeLock.lock(); - - // A map to group resource requests and dedup - Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests = - new HashMap<>(); + // Update AppPlacementAllocator by requests + offswitchResourcesUpdated = internalAddResourceRequests( + recoverPreemptedRequestForAContainer, dedupRequests); + } finally { + writeLock.unlock(); + } - // Group resource request by schedulerRequestKey and resourceName - for (ResourceRequest request : requests) { - SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); - if (!dedupRequests.containsKey(schedulerKey)) { - dedupRequests.put(schedulerKey, new HashMap<>()); - } - dedupRequests.get(schedulerKey).put(request.getResourceName(), request); - } + return offswitchResourcesUpdated; + } - // Update AppPlacementAllocator by dedup requests. - offswitchResourcesUpdated = - addRequestToAppPlacement( - recoverPreemptedRequestForAContainer, dedupRequests); + /** + * The ApplicationMaster is updating resource requirements for the + * application, by asking for more resources and releasing resources acquired + * by the application. + * + * @param schedulingRequests resource requests to be allocated + * @param recoverPreemptedRequestForAContainer + * recover ResourceRequest/SchedulingRequest on preemption + * @return true if any resource was updated, false otherwise + */ + public boolean updateSchedulingRequests( + List<SchedulingRequest> schedulingRequests, + boolean recoverPreemptedRequestForAContainer) { + // Flag to track if any incoming requests update "ANY" requests + boolean offswitchResourcesUpdated; - return offswitchResourcesUpdated; + writeLock.lock(); + try { + // Update AppPlacementAllocator by requests + offswitchResourcesUpdated = addSchedulingRequests( + recoverPreemptedRequestForAContainer, schedulingRequests); } finally { - this.writeLock.unlock(); + writeLock.unlock(); } + + return offswitchResourcesUpdated; } public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) { schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey); } - boolean addRequestToAppPlacement( + private boolean addSchedulingRequests( + boolean recoverPreemptedRequestForAContainer, + List<SchedulingRequest> schedulingRequests) { + // Do we need to update pending resource for app/queue, etc.? + boolean requireUpdatePendingResource = false; + + for (SchedulingRequest request : schedulingRequests) { + SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create( + request); + + AppPlacementAllocator appPlacementAllocator = + getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey, + SingleConstraintAppPlacementAllocator.class.getCanonicalName()); + + // Update AppPlacementAllocator + PendingAskUpdateResult pendingAmountChanges = + appPlacementAllocator.updatePendingAsk(schedulerRequestKey, + request, recoverPreemptedRequestForAContainer); + + if (null != pendingAmountChanges) { + updatePendingResources(pendingAmountChanges, schedulerRequestKey, + queue.getMetrics()); + requireUpdatePendingResource = true; + } + } + + return requireUpdatePendingResource; + } + + /** + * Get and insert AppPlacementAllocator if it doesn't exist, this should be + * protected by write lock. + * @param schedulerRequestKey schedulerRequestKey + * @param placementTypeClass placementTypeClass + * @return AppPlacementAllocator + */ + private AppPlacementAllocator<SchedulerNode> getAndAddAppPlacementAllocatorIfNotExist( + SchedulerRequestKey schedulerRequestKey, String placementTypeClass) { + AppPlacementAllocator<SchedulerNode> appPlacementAllocator; + if ((appPlacementAllocator = schedulerKeyToAppPlacementAllocator.get( + schedulerRequestKey)) == null) { + appPlacementAllocator = + ApplicationPlacementAllocatorFactory.getAppPlacementAllocator( + placementTypeClass, this, schedulerRequestKey, rmContext); + schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey, + appPlacementAllocator); + } + return appPlacementAllocator; + } + + private boolean internalAddResourceRequests( boolean recoverPreemptedRequestForAContainer, Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) { boolean offswitchResourcesUpdated = false; for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry : dedupRequests.entrySet()) { SchedulerRequestKey schedulerRequestKey = entry.getKey(); - - if (!schedulerKeyToAppPlacementAllocator - .containsKey(schedulerRequestKey)) { - AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = ApplicationPlacementFactory - .getAppPlacementAllocator(applicationSchedulingEnvs - .get(ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS)); - placementAllocatorInstance.setAppSchedulingInfo(this); - - schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey, - placementAllocatorInstance); - } + AppPlacementAllocator<SchedulerNode> appPlacementAllocator = + getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey, + applicationSchedulingEnvs.get( + ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS)); // Update AppPlacementAllocator - PendingAskUpdateResult pendingAmountChanges = schedulerKeyToAppPlacementAllocator - .get(schedulerRequestKey).updatePendingAsk(entry.getValue().values(), + PendingAskUpdateResult pendingAmountChanges = + appPlacementAllocator.updatePendingAsk(entry.getValue().values(), recoverPreemptedRequestForAContainer); if (null != pendingAmountChanges) { @@ -242,6 +327,29 @@ public class AppSchedulingInfo { return offswitchResourcesUpdated; } + private boolean internalAddResourceRequests(boolean recoverPreemptedRequestForAContainer, + List<ResourceRequest> resourceRequests) { + if (null == resourceRequests || resourceRequests.isEmpty()) { + return false; + } + + // A map to group resource requests and dedup + Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests = + new HashMap<>(); + + // Group resource request by schedulerRequestKey and resourceName + for (ResourceRequest request : resourceRequests) { + SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); + if (!dedupRequests.containsKey(schedulerKey)) { + dedupRequests.put(schedulerKey, new HashMap<>()); + } + dedupRequests.get(schedulerKey).put(request.getResourceName(), request); + } + + return internalAddResourceRequests(recoverPreemptedRequestForAContainer, + dedupRequests); + } + private void updatePendingResources(PendingAskUpdateResult updateResult, SchedulerRequestKey schedulerKey, QueueMetrics metrics) { @@ -629,13 +737,22 @@ public class AppSchedulingInfo { } } - public boolean acceptNodePartition(SchedulerRequestKey schedulerKey, - String nodePartition, SchedulingMode schedulingMode) { + /** + * Pre-check node to see if it satisfy the given schedulerKey and + * scheduler mode + * + * @param schedulerKey schedulerKey + * @param schedulerNode schedulerNode + * @param schedulingMode schedulingMode + * @return can use the node or not. + */ + public boolean precheckNode(SchedulerRequestKey schedulerKey, + SchedulerNode schedulerNode, SchedulingMode schedulingMode) { try { this.readLock.lock(); AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(schedulerKey); - return (ap != null) && ap.acceptNodePartition(nodePartition, + return (ap != null) && ap.precheckNode(schedulerNode, schedulingMode); } finally { this.readLock.unlock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java new file mode 100644 index 0000000..a4e5484 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +/** + * Factory class to build various application placement policies. + */ +@Public +@Unstable +public class ApplicationPlacementAllocatorFactory { + + /** + * Get AppPlacementAllocator related to the placement type requested. + * + * @param appPlacementAllocatorName + * allocator class name. + * @return Specific AppPlacementAllocator instance based on type + */ + public static AppPlacementAllocator<SchedulerNode> getAppPlacementAllocator( + String appPlacementAllocatorName, AppSchedulingInfo appSchedulingInfo, + SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { + Class<?> policyClass; + try { + if (appPlacementAllocatorName == null) { + policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; + } else { + policyClass = Class.forName(appPlacementAllocatorName); + } + } catch (ClassNotFoundException e) { + policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; + } + + if (!AppPlacementAllocator.class.isAssignableFrom(policyClass)) { + policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; + } + + @SuppressWarnings("unchecked") + AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = (AppPlacementAllocator<SchedulerNode>) ReflectionUtils + .newInstance(policyClass, null); + placementAllocatorInstance.initialize(appSchedulingInfo, + schedulerRequestKey, rmContext); + return placementAllocatorInstance; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java deleted file mode 100644 index 40c8d05..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler; - -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; - -/** - * Factory class to build various application placement policies. - */ -@Public -@Unstable -public class ApplicationPlacementFactory { - - /** - * Get AppPlacementAllocator related to the placement type requested. - * - * @param appPlacementAllocatorName - * allocator class name. - * @return Specific AppPlacementAllocator instance based on type - */ - public static AppPlacementAllocator<SchedulerNode> getAppPlacementAllocator( - String appPlacementAllocatorName) { - Class<?> policyClass; - try { - if (appPlacementAllocatorName == null) { - policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; - } else { - policyClass = Class.forName(appPlacementAllocatorName); - } - } catch (ClassNotFoundException e) { - policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; - } - - if (!AppPlacementAllocator.class.isAssignableFrom(policyClass)) { - policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; - } - - @SuppressWarnings("unchecked") - AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = (AppPlacementAllocator<SchedulerNode>) ReflectionUtils - .newInstance(policyClass, null); - return placementAllocatorInstance; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index f410db1..491a9ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -146,7 +146,7 @@ public class ContainerUpdateContext { createResourceRequests(rmContainer, schedulerNode, schedulerKey, resToIncrease); updateResReqs.put(schedulerKey, resMap); - appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs); + appSchedulingInfo.updateResourceRequests(updateResReqs, false); } return true; } @@ -290,7 +290,7 @@ public class ContainerUpdateContext { (rmContainer, node, schedulerKey, rmContainer.getContainer().getResource()); reqsToUpdate.put(schedulerKey, resMap); - appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate); + appSchedulingInfo.updateResourceRequests(reqsToUpdate, true); return UNDEFINED; } return retVal; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 3930a35..753c2b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -231,7 +232,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage, - applicationSchedulingEnvs); + applicationSchedulingEnvs, rmContext); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); @@ -451,6 +452,23 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { writeLock.unlock(); } } + + public boolean updateSchedulingRequests( + List<SchedulingRequest> requests) { + if (requests == null) { + return false; + } + + try { + writeLock.lock(); + if (!isStopped) { + return appSchedulingInfo.updateSchedulingRequests(requests, false); + } + return false; + } finally { + writeLock.unlock(); + } + } public void recoverResourceRequestsForContainer( ContainerRequest containerRequest) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 93ca7c2..43d55c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -132,18 +133,18 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { * * @param appAttemptId * @param ask + * @param schedulingRequests * @param release - * @param blacklistAdditions - * @param blacklistRemovals - * @param updateRequests - * @return the {@link Allocation} for the application + * @param blacklistAdditions + * @param blacklistRemovals + * @param updateRequests @return the {@link Allocation} for the application */ @Public @Stable Allocation allocate(ApplicationAttemptId appAttemptId, - List<ResourceRequest> ask, List<ContainerId> release, - List<String> blacklistAdditions, List<String> blacklistRemovals, - ContainerUpdates updateRequests); + List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests, + List<ContainerId> release, List<String> blacklistAdditions, + List<String> blacklistRemovals, ContainerUpdates updateRequests); /** * Get node resource usage report. http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 956d840..0d781ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -58,8 +58,11 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; @@ -1013,12 +1016,29 @@ public class CapacityScheduler extends } } + /** + * Normalize a list of SchedulingRequest + * + * @param asks scheduling request + */ + private void normalizeSchedulingRequests(List<SchedulingRequest> asks) { + if (asks == null) { + return; + } + for (SchedulingRequest ask: asks) { + ResourceSizing sizing = ask.getResourceSizing(); + if (sizing != null && sizing.getResources() != null) { + sizing.setResources(getNormalizedResource(sizing.getResources())); + } + } + } + @Override @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, - List<ResourceRequest> ask, List<ContainerId> release, - List<String> blacklistAdditions, List<String> blacklistRemovals, - ContainerUpdates updateRequests) { + List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests, + List<ContainerId> release, List<String> blacklistAdditions, + List<String> blacklistRemovals, ContainerUpdates updateRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed or non existent application " + @@ -1026,6 +1046,18 @@ public class CapacityScheduler extends return EMPTY_ALLOCATION; } + if ((!getConfiguration().getBoolean( + CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, + CapacitySchedulerConfiguration.DEFAULT_SCHEDULING_REQUEST_ALLOWED)) + && schedulingRequests != null && (!schedulingRequests.isEmpty())) { + throw new SchedulerInvalidResoureRequestException( + "Application attempt:" + applicationAttemptId + + " is using SchedulingRequest, which is disabled. Please update " + + CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED + + " to true in capacity-scheduler.xml in order to use this " + + "feature."); + } + // The allocate may be the leftover from previous attempt, and it will // impact current attempt, such as confuse the request and allocation for // current attempt's AM container. @@ -1046,7 +1078,10 @@ public class CapacityScheduler extends LeafQueue updateDemandForQueue = null; // Sanity check for new allocation requests - normalizeRequests(ask); + normalizeResourceRequests(ask); + + // Normalize scheduling requests + normalizeSchedulingRequests(schedulingRequests); Allocation allocation; @@ -1059,7 +1094,8 @@ public class CapacityScheduler extends } // Process resource requests - if (!ask.isEmpty()) { + if (!ask.isEmpty() || (schedulingRequests != null && !schedulingRequests + .isEmpty())) { if (LOG.isDebugEnabled()) { LOG.debug( "allocate: pre-update " + applicationAttemptId + " ask size =" @@ -1068,7 +1104,8 @@ public class CapacityScheduler extends } // Update application requests - if (application.updateResourceRequests(ask)) { + if (application.updateResourceRequests(ask) || application + .updateSchedulingRequests(schedulingRequests)) { updateDemandForQueue = (LeafQueue) application.getQueue(); } @@ -2518,10 +2555,9 @@ public class CapacityScheduler extends // Validate placement constraint is satisfied before // committing the request. try { - if (!PlacementConstraintsUtil.canSatisfyConstraints( + if (!PlacementConstraintsUtil.canSatisfySingleConstraint( appAttempt.getApplicationId(), - schedulingRequest.getAllocationTags(), - schedulerNode, + schedulingRequest.getAllocationTags(), schedulerNode, rmContext.getPlacementConstraintManager(), rmContext.getAllocationTagsManager())) { LOG.debug("Failed to allocate container for application " http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 8aa41ee..fb133cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -76,6 +76,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final String PREFIX = "yarn.scheduler.capacity."; + + @Private + public static final String SCHEDULING_REQUEST_ALLOWED = + PREFIX + "scheduling-request.allowed"; + public static final boolean DEFAULT_SCHEDULING_REQUEST_ALLOWED = false; @Private public static final String DOT = "."; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 2642532..afa468b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -143,8 +143,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Is the nodePartition of pending request matches the node's partition // If not match, jump to next priority. - if (!appInfo.acceptNodePartition(schedulerKey, node.getPartition(), - schedulingMode)) { + if (!appInfo.precheckNode(schedulerKey, node, schedulingMode)) { ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, ActivityDiagnosticConstant. http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java index 075db79..cad15a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import java.util.List; @@ -43,12 +44,23 @@ import java.util.List; */ public class ContainerRequest { private List<ResourceRequest> requests; + private SchedulingRequest schedulingRequest; public ContainerRequest(List<ResourceRequest> requests) { this.requests = requests; + schedulingRequest = null; + } + + public ContainerRequest(SchedulingRequest schedulingRequest) { + this.schedulingRequest = schedulingRequest; + this.requests = null; } public List<ResourceRequest> getResourceRequests() { return requests; } + + public SchedulingRequest getSchedulingRequest() { + return schedulingRequest; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java index 85d8715..2ed3e83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -31,6 +32,11 @@ public class PendingAsk { private final int count; public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0); + public PendingAsk(ResourceSizing sizing) { + this.perAllocationResource = sizing.getResources(); + this.count = sizing.getNumAllocations(); + } + public PendingAsk(Resource res, int num) { this.perAllocationResource = res; this.count = num; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 4ea0347..7eb1e31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -542,6 +542,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { schedulerContainer.getRmContainer().getContainer()); ((RMContainerImpl) rmContainer).setContainerRequest( containerRequest); + + // If this is from a SchedulingRequest, set allocation tags. + if (containerRequest.getSchedulingRequest() != null) { + ((RMContainerImpl) rmContainer).setAllocationTags( + containerRequest.getSchedulingRequest().getAllocationTags()); + } } attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index 4bb3e79..962e548 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.log4j.Logger; @@ -287,21 +288,15 @@ public class AllocationTagsManager { * {@link SchedulingRequest#getAllocationTags()} * application_id will be added to allocationTags. */ + @SuppressWarnings("unchecked") public void addContainer(NodeId nodeId, ContainerId containerId, Set<String> allocationTags) { + // Do nothing for empty allocation tags. + if (allocationTags == null || allocationTags.isEmpty()) { + return; + } ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); - String applicationIdTag = - AllocationTagsNamespaces.APP_ID + applicationId.toString(); - - boolean useSet = false; - if (allocationTags != null && !allocationTags.isEmpty()) { - // Copy before edit it. - allocationTags = new HashSet<>(allocationTags); - allocationTags.add(applicationIdTag); - useSet = true; - } - writeLock.lock(); try { TypeToCountedTags perAppTagsMapping = perAppNodeMappings @@ -311,19 +306,12 @@ public class AllocationTagsManager { // Covering test-cases where context is mocked String nodeRack = (rmContext.getRMNodes() != null && rmContext.getRMNodes().get(nodeId) != null) - ? rmContext.getRMNodes().get(nodeId).getRackName() - : "default-rack"; - if (useSet) { - perAppTagsMapping.addTags(nodeId, allocationTags); - perAppRackTagsMapping.addTags(nodeRack, allocationTags); - globalNodeMapping.addTags(nodeId, allocationTags); - globalRackMapping.addTags(nodeRack, allocationTags); - } else { - perAppTagsMapping.addTag(nodeId, applicationIdTag); - perAppRackTagsMapping.addTag(nodeRack, applicationIdTag); - globalNodeMapping.addTag(nodeId, applicationIdTag); - globalRackMapping.addTag(nodeRack, applicationIdTag); - } + ? rmContext.getRMNodes().get(nodeId).getRackName() : + "default-rack"; + perAppTagsMapping.addTags(nodeId, allocationTags); + perAppRackTagsMapping.addTags(nodeRack, allocationTags); + globalNodeMapping.addTags(nodeId, allocationTags); + globalRackMapping.addTags(nodeRack, allocationTags); if (LOG.isDebugEnabled()) { LOG.debug("Added container=" + containerId + " with tags=[" @@ -341,20 +329,15 @@ public class AllocationTagsManager { * @param containerId containerId. * @param allocationTags allocation tags for given container */ + @SuppressWarnings("unchecked") public void removeContainer(NodeId nodeId, ContainerId containerId, Set<String> allocationTags) { + // Do nothing for empty allocation tags. + if (allocationTags == null || allocationTags.isEmpty()) { + return; + } ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); - String applicationIdTag = - AllocationTagsNamespaces.APP_ID + applicationId.toString(); - boolean useSet = false; - - if (allocationTags != null && !allocationTags.isEmpty()) { - // Copy before edit it. - allocationTags = new HashSet<>(allocationTags); - allocationTags.add(applicationIdTag); - useSet = true; - } writeLock.lock(); try { @@ -368,19 +351,12 @@ public class AllocationTagsManager { // Covering test-cases where context is mocked String nodeRack = (rmContext.getRMNodes() != null && rmContext.getRMNodes().get(nodeId) != null) - ? rmContext.getRMNodes().get(nodeId).getRackName() - : "default-rack"; - if (useSet) { - perAppTagsMapping.removeTags(nodeId, allocationTags); - perAppRackTagsMapping.removeTags(nodeRack, allocationTags); - globalNodeMapping.removeTags(nodeId, allocationTags); - globalRackMapping.removeTags(nodeRack, allocationTags); - } else { - perAppTagsMapping.removeTag(nodeId, applicationIdTag); - perAppRackTagsMapping.removeTag(nodeRack, applicationIdTag); - globalNodeMapping.removeTag(nodeId, applicationIdTag); - globalRackMapping.removeTag(nodeRack, applicationIdTag); - } + ? rmContext.getRMNodes().get(nodeId).getRackName() : + "default-rack"; + perAppTagsMapping.removeTags(nodeId, allocationTags); + perAppRackTagsMapping.removeTags(nodeRack, allocationTags); + globalNodeMapping.removeTags(nodeId, allocationTags); + globalRackMapping.removeTags(nodeRack, allocationTags); if (perAppTagsMapping.isEmpty()) { perAppNodeMappings.remove(applicationId); @@ -602,6 +578,7 @@ public class AllocationTagsManager { * @throws InvalidAllocationTagsQueryException when illegal query * parameter specified */ + @SuppressWarnings("unchecked") public long getRackCardinalityByOp(String rack, ApplicationId applicationId, Set<String> tags, LongBinaryOperator op) throws InvalidAllocationTagsQueryException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java deleted file mode 100644 index 43fcfe5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * / - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; - -/** - * Predefined namespaces for tags - * - * Same as namespace of resource types. Namespaces of placement tags are start - * with alphabets and ended with "/" - */ -public class AllocationTagsNamespaces { - public static final String APP_ID = "yarn_app_id/"; -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org