YARN-8013. Support application tags when defining application namespaces for placement constraints. Contributed by Weiwei Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7853ec8d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7853ec8d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7853ec8d Branch: refs/heads/HDFS-7240 Commit: 7853ec8d2fb8731b7f7c28fd87491a0a2d47967e Parents: 42cd367 Author: Konstantinos Karanasos <kkarana...@apache.org> Authored: Wed Apr 4 10:51:58 2018 -0700 Committer: Konstantinos Karanasos <kkarana...@apache.org> Committed: Wed Apr 4 10:51:58 2018 -0700 ---------------------------------------------------------------------- .../api/records/AllocationTagNamespaceType.java | 2 +- .../constraint/AllocationTagNamespace.java | 312 ------------------ .../scheduler/constraint/AllocationTags.java | 44 ++- .../constraint/AllocationTagsManager.java | 47 ++- .../constraint/PlacementConstraintsUtil.java | 41 +-- .../constraint/TargetApplications.java | 53 ++- .../constraint/TargetApplicationsNamespace.java | 326 +++++++++++++++++++ .../SingleConstraintAppPlacementAllocator.java | 21 -- .../server/resourcemanager/rmapp/MockRMApp.java | 9 +- ...estSchedulingRequestContainerAllocation.java | 5 +- .../constraint/TestAllocationTagsManager.java | 22 +- .../constraint/TestAllocationTagsNamespace.java | 89 ++++- .../TestPlacementConstraintsUtil.java | 125 ++++++- 13 files changed, 654 insertions(+), 442 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java index de5492e..f304600 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java @@ -26,7 +26,7 @@ public enum AllocationTagNamespaceType { SELF("self"), NOT_SELF("not-self"), APP_ID("app-id"), - APP_LABEL("app-label"), + APP_TAG("app-tag"), ALL("all"); private String typeKeyword; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java deleted file mode 100644 index 7b9f3be..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java +++ /dev/null @@ -1,312 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; -import org.apache.hadoop.yarn.api.records.ApplicationId; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL; - -/** - * Class to describe the namespace of an allocation tag. - * Each namespace can be evaluated against a set of applications. - * After evaluation, the namespace should have an implicit set of - * applications which defines its scope. - */ -public abstract class AllocationTagNamespace implements - Evaluable<TargetApplications> { - - public final static String NAMESPACE_DELIMITER = "/"; - - private AllocationTagNamespaceType nsType; - // Namespace scope value will be delay binding by eval method. - private Set<ApplicationId> nsScope; - - public AllocationTagNamespace(AllocationTagNamespaceType - allocationTagNamespaceType) { - this.nsType = allocationTagNamespaceType; - } - - protected void setScopeIfNotNull(Set<ApplicationId> appIds) { - if (appIds != null) { - this.nsScope = appIds; - } - } - - /** - * Get the type of the namespace. - * @return namespace type. - */ - public AllocationTagNamespaceType getNamespaceType() { - return nsType; - } - - /** - * Get the scope of the namespace, in form of a set of applications. - * - * @return a set of applications. - */ - public Set<ApplicationId> getNamespaceScope() { - if (this.nsScope == null) { - throw new IllegalStateException("Invalid namespace scope," - + " it is not initialized. Evaluate must be called before" - + " a namespace can be consumed."); - } - return this.nsScope; - } - - /** - * Evaluate the namespace against given target applications - * if it is necessary. Only self/not-self/app-label namespace types - * require this evaluation step, because they are not binding to a - * specific scope during initiating. So we do lazy binding for them - * in this method. - * - * @param target a generic type target that impacts this evaluation. - * @throws InvalidAllocationTagsQueryException - */ - @Override - public void evaluate(TargetApplications target) - throws InvalidAllocationTagsQueryException { - // Sub-class needs to override this when it requires the eval step. - } - - @Override - public String toString() { - return this.nsType.toString(); - } - - /** - * Namespace within application itself. - */ - public static class Self extends AllocationTagNamespace { - - public Self() { - super(SELF); - } - - @Override - public void evaluate(TargetApplications target) - throws InvalidAllocationTagsQueryException { - if (target == null || target.getCurrentApplicationId() == null) { - throw new InvalidAllocationTagsQueryException("Namespace Self must" - + " be evaluated against a single application ID."); - } - ApplicationId applicationId = target.getCurrentApplicationId(); - setScopeIfNotNull(ImmutableSet.of(applicationId)); - } - } - - /** - * Namespace to all applications except itself. - */ - public static class NotSelf extends AllocationTagNamespace { - - private ApplicationId applicationId; - - public NotSelf() { - super(NOT_SELF); - } - - /** - * The scope of self namespace is to an application itself, - * the application ID can be delay binding to the namespace. - * - * @param appId application ID. - */ - public void setApplicationId(ApplicationId appId) { - this.applicationId = appId; - } - - public ApplicationId getApplicationId() { - return this.applicationId; - } - - @Override - public void evaluate(TargetApplications target) { - Set<ApplicationId> otherAppIds = target.getOtherApplicationIds(); - setScopeIfNotNull(otherAppIds); - } - } - - /** - * Namespace to all applications in the cluster. - */ - public static class All extends AllocationTagNamespace { - - public All() { - super(ALL); - } - } - - /** - * Namespace to all applications in the cluster. - */ - public static class AppLabel extends AllocationTagNamespace { - - public AppLabel() { - super(APP_LABEL); - } - - @Override - public void evaluate(TargetApplications target) { - // TODO Implement app-label namespace evaluation - } - } - - /** - * Namespace defined by a certain application ID. - */ - public static class AppID extends AllocationTagNamespace { - - private ApplicationId targetAppId; - // app-id namespace requires an extra value of an application id. - public AppID(ApplicationId applicationId) { - super(APP_ID); - this.targetAppId = applicationId; - setScopeIfNotNull(ImmutableSet.of(targetAppId)); - } - - @Override - public String toString() { - return APP_ID.toString() + NAMESPACE_DELIMITER + this.targetAppId; - } - } - - /** - * Parse namespace from a string. The string must be in legal format - * defined by each {@link AllocationTagNamespaceType}. - * - * @param namespaceStr namespace string. - * @return an instance of {@link AllocationTagNamespace}. - * @throws InvalidAllocationTagsQueryException - * if given string is not in valid format - */ - public static AllocationTagNamespace parse(String namespaceStr) - throws InvalidAllocationTagsQueryException { - // Return the default namespace if no valid string is given. - if (Strings.isNullOrEmpty(namespaceStr)) { - return new Self(); - } - - // Normalize the input, escape additional chars. - List<String> nsValues = normalize(namespaceStr); - // The first string should be the prefix. - String nsPrefix = nsValues.get(0); - AllocationTagNamespaceType allocationTagNamespaceType = - fromString(nsPrefix); - switch (allocationTagNamespaceType) { - case SELF: - return new Self(); - case NOT_SELF: - return new NotSelf(); - case ALL: - return new All(); - case APP_ID: - if (nsValues.size() != 2) { - throw new InvalidAllocationTagsQueryException( - "Missing the application ID in the namespace string: " - + namespaceStr); - } - String appIDStr = nsValues.get(1); - return parseAppID(appIDStr); - case APP_LABEL: - return new AppLabel(); - default: - throw new InvalidAllocationTagsQueryException( - "Invalid namespace string " + namespaceStr); - } - } - - private static AllocationTagNamespaceType fromString(String prefix) throws - InvalidAllocationTagsQueryException { - for (AllocationTagNamespaceType type : - AllocationTagNamespaceType.values()) { - if(type.getTypeKeyword().equals(prefix)) { - return type; - } - } - - Set<String> values = Arrays.stream(AllocationTagNamespaceType.values()) - .map(AllocationTagNamespaceType::toString) - .collect(Collectors.toSet()); - throw new InvalidAllocationTagsQueryException( - "Invalid namespace prefix: " + prefix - + ", valid values are: " + String.join(",", values)); - } - - private static AllocationTagNamespace parseAppID(String appIDStr) - throws InvalidAllocationTagsQueryException { - try { - ApplicationId applicationId = ApplicationId.fromString(appIDStr); - return new AppID(applicationId); - } catch (IllegalArgumentException e) { - throw new InvalidAllocationTagsQueryException( - "Invalid application ID for " - + APP_ID.getTypeKeyword() + ": " + appIDStr); - } - } - - /** - * Valid given namespace string and parse it to a list of sub-strings - * that can be consumed by the parser according to the type of the - * namespace. Currently the size of return list should be either 1 or 2. - * Extra slash is escaped during the normalization. - * - * @param namespaceStr namespace string. - * @return a list of parsed strings. - * @throws InvalidAllocationTagsQueryException - * if namespace format is unexpected. - */ - private static List<String> normalize(String namespaceStr) - throws InvalidAllocationTagsQueryException { - List<String> result = new ArrayList<>(); - if (namespaceStr == null) { - return result; - } - - String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER); - for (String str : nsValues) { - if (!Strings.isNullOrEmpty(str)) { - result.add(str); - } - } - - // Currently we only allow 1 or 2 values for a namespace string - if (result.size() == 0 || result.size() > 2) { - throw new InvalidAllocationTagsQueryException("Invalid namespace string: " - + namespaceStr + ", the syntax is <namespace_prefix> or" - + " <namespace_prefix>/<namespace_value>"); - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java index dc0237e..5cae92c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.records.ApplicationId; import java.util.Set; @@ -29,22 +28,34 @@ import java.util.Set; */ public final class AllocationTags { - private AllocationTagNamespace ns; + private TargetApplicationsNamespace ns; private Set<String> tags; + private ApplicationId applicationId; - private AllocationTags(AllocationTagNamespace namespace, + private AllocationTags(TargetApplicationsNamespace namespace, Set<String> allocationTags) { this.ns = namespace; this.tags = allocationTags; } + private AllocationTags(TargetApplicationsNamespace namespace, + Set<String> allocationTags, ApplicationId currentAppId) { + this.ns = namespace; + this.tags = allocationTags; + this.applicationId = currentAppId; + } + /** * @return the namespace of these tags. */ - public AllocationTagNamespace getNamespace() { + public TargetApplicationsNamespace getNamespace() { return this.ns; } + public ApplicationId getCurrentApplicationId() { + return this.applicationId; + } + /** * @return the allocation tags. */ @@ -55,28 +66,31 @@ public final class AllocationTags { @VisibleForTesting public static AllocationTags createSingleAppAllocationTags( ApplicationId appId, Set<String> tags) { - AllocationTagNamespace namespace = new AllocationTagNamespace.AppID(appId); + TargetApplicationsNamespace namespace = + new TargetApplicationsNamespace.AppID(appId); return new AllocationTags(namespace, tags); } @VisibleForTesting public static AllocationTags createGlobalAllocationTags(Set<String> tags) { - AllocationTagNamespace namespace = new AllocationTagNamespace.All(); + TargetApplicationsNamespace namespace = + new TargetApplicationsNamespace.All(); return new AllocationTags(namespace, tags); } @VisibleForTesting public static AllocationTags createOtherAppAllocationTags( - ApplicationId currentApp, Set<ApplicationId> allIds, Set<String> tags) - throws InvalidAllocationTagsQueryException { - AllocationTagNamespace namespace = new AllocationTagNamespace.NotSelf(); - TargetApplications ta = new TargetApplications(currentApp, allIds); - namespace.evaluate(ta); - return new AllocationTags(namespace, tags); + ApplicationId currentApp, Set<String> tags) { + TargetApplicationsNamespace namespace = + new TargetApplicationsNamespace.NotSelf(); + return new AllocationTags(namespace, tags, currentApp); } - public static AllocationTags newAllocationTags( - AllocationTagNamespace namespace, Set<String> tags) { - return new AllocationTags(namespace, tags); + public static AllocationTags createAllocationTags( + ApplicationId currentApplicationId, String namespaceString, + Set<String> tags) throws InvalidAllocationTagsQueryException { + TargetApplicationsNamespace namespace = TargetApplicationsNamespace + .parse(namespaceString); + return new AllocationTags(namespace, tags, currentApplicationId); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index 830566a..4fc2bab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -32,12 +31,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.log4j.Logger; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.LongBinaryOperator; @@ -292,13 +293,21 @@ public class AllocationTagsManager { /** * Aggregates multiple {@link TypeToCountedTags} to a single one based on - * a given set of application IDs, the values are properly merged. + * the scope defined in the allocation tags, the values are properly merged. * - * @param appIds a set of application IDs. + * @param allocationTags {@link AllocationTags}. * @return an aggregated {@link TypeToCountedTags}. */ - private TypeToCountedTags aggregateAllocationTags(Set<ApplicationId> appIds, - Map<ApplicationId, TypeToCountedTags> mapping) { + private TypeToCountedTags aggregateAllocationTags( + AllocationTags allocationTags, + Map<ApplicationId, TypeToCountedTags> mapping) + throws InvalidAllocationTagsQueryException { + // Based on the namespace type of the given allocation tags + TargetApplicationsNamespace namespace = allocationTags.getNamespace(); + TargetApplications ta = new TargetApplications( + allocationTags.getCurrentApplicationId(), getApplicationIdToTags()); + namespace.evaluate(ta); + Set<ApplicationId> appIds = namespace.getNamespaceScope(); TypeToCountedTags result = new TypeToCountedTags(); if (appIds != null) { if (appIds.size() == 1) { @@ -571,9 +580,7 @@ public class AllocationTagsManager { mapping = globalNodeMapping; } else { // Aggregate app tags cardinality by applications. - mapping = aggregateAllocationTags( - tags.getNamespace().getNamespaceScope(), - perAppNodeMappings); + mapping = aggregateAllocationTags(tags, perAppNodeMappings); } return mapping == null ? 0 : @@ -618,9 +625,7 @@ public class AllocationTagsManager { mapping = globalRackMapping; } else { // Aggregates cardinality by rack. - mapping = aggregateAllocationTags( - tags.getNamespace().getNamespaceScope(), - perAppRackMappings); + mapping = aggregateAllocationTags(tags, perAppRackMappings); } return mapping == null ? 0 : @@ -642,10 +647,22 @@ public class AllocationTagsManager { } /** - * @return all application IDs in a set that currently visible by - * the allocation tags manager. + * @return all applications that is known to the + * {@link AllocationTagsManager}, along with their application tags. + * The result is a map, where key is an application ID, and value is the + * application-tags attached to this application. If there is no + * application-tag exists for the application, the value is an empty set. */ - public Set<ApplicationId> getAllApplicationIds() { - return ImmutableSet.copyOf(perAppNodeMappings.keySet()); + private Map<ApplicationId, Set<String>> getApplicationIdToTags() { + Map<ApplicationId, Set<String>> result = new HashMap<>(); + ConcurrentMap<ApplicationId, RMApp> allApps = rmContext.getRMApps(); + if (allApps != null) { + for (Map.Entry<ApplicationId, RMApp> app : allApps.entrySet()) { + if (perAppNodeMappings.containsKey(app.getKey())) { + result.put(app.getKey(), app.getValue().getApplicationTags()); + } + } + } + return result; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 389fc5c..efa7b65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; @@ -58,35 +57,6 @@ public final class PlacementConstraintsUtil { } /** - * Try to the namespace of the allocation tags from the given target key. - * - * @param targetKey - * @return allocation tag namespace. - * @throws InvalidAllocationTagsQueryException - * if fail to parse the target key to a valid namespace. - */ - private static AllocationTagNamespace getAllocationTagNamespace( - ApplicationId currentAppId, String targetKey, AllocationTagsManager atm) - throws InvalidAllocationTagsQueryException { - // Parse to a valid namespace. - AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey); - - // TODO Complete remove this check once we support app-label. - if (AllocationTagNamespaceType.APP_LABEL - .equals(namespace.getNamespaceType())) { - throw new InvalidAllocationTagsQueryException( - namespace.toString() + " is not supported yet!"); - } - - // Evaluate the namespace according to the given target - // before it can be consumed. - TargetApplications ta = - new TargetApplications(currentAppId, atm.getAllApplicationIds()); - namespace.evaluate(ta); - return namespace; - } - - /** * Returns true if <b>single</b> placement constraint with associated * allocationTags and scope is satisfied by a specific scheduler Node. * @@ -104,13 +74,10 @@ public final class PlacementConstraintsUtil { ApplicationId targetApplicationId, SingleConstraint sc, TargetExpression te, SchedulerNode node, AllocationTagsManager tm) throws InvalidAllocationTagsQueryException { - // Parse the allocation tag's namespace from the given target key, - // then evaluate the namespace and get its scope, - // which is represented by one or more application IDs. - AllocationTagNamespace namespace = getAllocationTagNamespace( - targetApplicationId, te.getTargetKey(), tm); - AllocationTags allocationTags = AllocationTags - .newAllocationTags(namespace, te.getTargetValues()); + // Creates AllocationTags that will be further consumed by allocation + // tags manager for cardinality check. + AllocationTags allocationTags = AllocationTags.createAllocationTags( + targetApplicationId, te.getTargetKey(), te.getTargetValues()); long minScopeCardinality = 0; long maxScopeCardinality = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java index 0de7c9e..ab1bd9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java @@ -18,34 +18,77 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.records.ApplicationId; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; /** * This class is used by - * {@link AllocationTagNamespace#evaluate(TargetApplications)} to evaluate + * {@link TargetApplicationsNamespace#evaluate(TargetApplications)} to evaluate * a namespace. */ public class TargetApplications { private ApplicationId currentAppId; - private Set<ApplicationId> allAppIds; + private Map<ApplicationId, Set<String>> allApps; public TargetApplications(ApplicationId currentApplicationId, Set<ApplicationId> allApplicationIds) { this.currentAppId = currentApplicationId; - this.allAppIds = allApplicationIds; + allApps = new HashMap<>(); + if (allApplicationIds != null) { + allApplicationIds.forEach(appId -> + allApps.put(appId, ImmutableSet.of())); + } + } + + public TargetApplications(ApplicationId currentApplicationId, + Map<ApplicationId, Set<String>> allApplicationIds) { + this.currentAppId = currentApplicationId; + this.allApps = allApplicationIds; } public ApplicationId getCurrentApplicationId() { return this.currentAppId; } + public Set<ApplicationId> getAllApplicationIds() { + return this.allApps == null ? + ImmutableSet.of() : allApps.keySet(); + } + public Set<ApplicationId> getOtherApplicationIds() { - return allAppIds == null ? null : allAppIds.stream().filter(appId -> - !appId.equals(getCurrentApplicationId())) + if (getAllApplicationIds() == null + || getAllApplicationIds().isEmpty()) { + return ImmutableSet.of(); + } + return getAllApplicationIds() + .stream() + .filter(appId -> !appId.equals(getCurrentApplicationId())) .collect(Collectors.toSet()); } + + public Set<ApplicationId> getApplicationIdsByTag(String applicationTag) { + Set<ApplicationId> result = new HashSet<>(); + if (Strings.isNullOrEmpty(applicationTag) + || this.allApps == null) { + return result; + } + + for (Map.Entry<ApplicationId, Set<String>> app + : this.allApps.entrySet()) { + if (app.getValue() != null + && app.getValue().contains(applicationTag)) { + result.add(app.getKey()); + } + } + + return result; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplicationsNamespace.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplicationsNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplicationsNamespace.java new file mode 100644 index 0000000..a82a03f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplicationsNamespace.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_TAG; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL; + +/** + * Class to describe the namespace of allocation tags, used by + * {@link AllocationTags}. Each namespace can be evaluated against + * a target set applications, represented by {@link TargetApplications}. + * After evaluation, the namespace is interpreted to be a set of + * applications based on the namespace type. + */ +public abstract class TargetApplicationsNamespace implements + Evaluable<TargetApplications> { + + public final static String NAMESPACE_DELIMITER = "/"; + + private AllocationTagNamespaceType nsType; + // Namespace scope value will be delay binding by eval method. + private Set<ApplicationId> nsScope; + + public TargetApplicationsNamespace(AllocationTagNamespaceType + allocationTagNamespaceType) { + this.nsType = allocationTagNamespaceType; + } + + protected void setScopeIfNotNull(Set<ApplicationId> appIds) { + if (appIds != null) { + this.nsScope = appIds; + } + } + + /** + * Get the type of the namespace. + * @return namespace type. + */ + public AllocationTagNamespaceType getNamespaceType() { + return nsType; + } + + /** + * Get the scope of the namespace, in form of a set of applications. + * + * @return a set of applications. + */ + public Set<ApplicationId> getNamespaceScope() { + if (this.nsScope == null) { + throw new IllegalStateException("Invalid namespace scope," + + " it is not initialized. Evaluate must be called before" + + " a namespace can be consumed."); + } + return this.nsScope; + } + + /** + * Evaluate the namespace against given target applications + * if it is necessary. Only self/not-self/app-label namespace types + * require this evaluation step, because they are not binding to a + * specific scope during initiating. So we do lazy binding for them + * in this method. + * + * @param target a generic type target that impacts this evaluation. + * @throws InvalidAllocationTagsQueryException + */ + @Override + public void evaluate(TargetApplications target) + throws InvalidAllocationTagsQueryException { + // Sub-class needs to override this when it requires the eval step. + } + + @Override + public String toString() { + return this.nsType.toString(); + } + + /** + * Namespace within application itself. + */ + public static class Self extends TargetApplicationsNamespace { + + public Self() { + super(SELF); + } + + @Override + public void evaluate(TargetApplications target) + throws InvalidAllocationTagsQueryException { + if (target == null || target.getCurrentApplicationId() == null) { + throw new InvalidAllocationTagsQueryException("Namespace Self must" + + " be evaluated against a single application ID."); + } + ApplicationId applicationId = target.getCurrentApplicationId(); + setScopeIfNotNull(ImmutableSet.of(applicationId)); + } + } + + /** + * Namespace to all applications except itself. + */ + public static class NotSelf extends TargetApplicationsNamespace { + + private ApplicationId applicationId; + + public NotSelf() { + super(NOT_SELF); + } + + /** + * The scope of self namespace is to an application itself, + * the application ID can be delay binding to the namespace. + * + * @param appId application ID. + */ + public void setApplicationId(ApplicationId appId) { + this.applicationId = appId; + } + + public ApplicationId getApplicationId() { + return this.applicationId; + } + + @Override + public void evaluate(TargetApplications target) { + Set<ApplicationId> otherAppIds = target.getOtherApplicationIds(); + setScopeIfNotNull(otherAppIds); + } + } + + /** + * Namespace to all applications in the cluster. + */ + public static class All extends TargetApplicationsNamespace { + + public All() { + super(ALL); + } + } + + /** + * Namespace to applications that attached with a certain application tag. + */ + public static class AppTag extends TargetApplicationsNamespace { + + private String applicationTag; + + public AppTag(String appTag) { + super(APP_TAG); + this.applicationTag = appTag; + } + + @Override + public void evaluate(TargetApplications target) { + setScopeIfNotNull(target.getApplicationIdsByTag(applicationTag)); + } + + @Override + public String toString() { + return APP_TAG.toString() + NAMESPACE_DELIMITER + this.applicationTag; + } + } + + /** + * Namespace defined by a certain application ID. + */ + public static class AppID extends TargetApplicationsNamespace { + + private ApplicationId targetAppId; + // app-id namespace requires an extra value of an application id. + public AppID(ApplicationId applicationId) { + super(APP_ID); + this.targetAppId = applicationId; + setScopeIfNotNull(ImmutableSet.of(targetAppId)); + } + + @Override + public String toString() { + return APP_ID.toString() + NAMESPACE_DELIMITER + this.targetAppId; + } + } + + /** + * Parse namespace from a string. The string must be in legal format + * defined by each {@link AllocationTagNamespaceType}. + * + * @param namespaceStr namespace string. + * @return an instance of {@link TargetApplicationsNamespace}. + * @throws InvalidAllocationTagsQueryException + * if given string is not in valid format + */ + public static TargetApplicationsNamespace parse(String namespaceStr) + throws InvalidAllocationTagsQueryException { + // Return the default namespace if no valid string is given. + if (Strings.isNullOrEmpty(namespaceStr)) { + return new Self(); + } + + // Normalize the input, escape additional chars. + List<String> nsValues = normalize(namespaceStr); + // The first string should be the prefix. + String nsPrefix = nsValues.get(0); + AllocationTagNamespaceType allocationTagNamespaceType = + fromString(nsPrefix); + switch (allocationTagNamespaceType) { + case SELF: + return new Self(); + case NOT_SELF: + return new NotSelf(); + case ALL: + return new All(); + case APP_ID: + if (nsValues.size() != 2) { + throw new InvalidAllocationTagsQueryException( + "Missing the application ID in the namespace string: " + + namespaceStr); + } + String appIDStr = nsValues.get(1); + return parseAppID(appIDStr); + case APP_TAG: + if (nsValues.size() != 2) { + throw new InvalidAllocationTagsQueryException( + "Missing the application tag in the namespace string: " + + namespaceStr); + } + return new AppTag(nsValues.get(1)); + default: + throw new InvalidAllocationTagsQueryException( + "Invalid namespace string " + namespaceStr); + } + } + + private static AllocationTagNamespaceType fromString(String prefix) throws + InvalidAllocationTagsQueryException { + for (AllocationTagNamespaceType type : + AllocationTagNamespaceType.values()) { + if(type.getTypeKeyword().equals(prefix)) { + return type; + } + } + + Set<String> values = Arrays.stream(AllocationTagNamespaceType.values()) + .map(AllocationTagNamespaceType::toString) + .collect(Collectors.toSet()); + throw new InvalidAllocationTagsQueryException( + "Invalid namespace prefix: " + prefix + + ", valid values are: " + String.join(",", values)); + } + + private static TargetApplicationsNamespace parseAppID(String appIDStr) + throws InvalidAllocationTagsQueryException { + try { + ApplicationId applicationId = ApplicationId.fromString(appIDStr); + return new AppID(applicationId); + } catch (IllegalArgumentException e) { + throw new InvalidAllocationTagsQueryException( + "Invalid application ID for " + + APP_ID.getTypeKeyword() + ": " + appIDStr); + } + } + + /** + * Valid given namespace string and parse it to a list of sub-strings + * that can be consumed by the parser according to the type of the + * namespace. Currently the size of return list should be either 1 or 2. + * Extra slash is escaped during the normalization. + * + * @param namespaceStr namespace string. + * @return a list of parsed strings. + * @throws InvalidAllocationTagsQueryException + * if namespace format is unexpected. + */ + private static List<String> normalize(String namespaceStr) + throws InvalidAllocationTagsQueryException { + List<String> result = new ArrayList<>(); + if (namespaceStr == null) { + return result; + } + + String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER); + for (String str : nsValues) { + if (!Strings.isNullOrEmpty(str)) { + result.add(str); + } + } + + // Currently we only allow 1 or 2 values for a namespace string + if (result.size() == 0 || result.size() > 2) { + throw new InvalidAllocationTagsQueryException("Invalid namespace string: " + + namespaceStr + ", the syntax is <namespace_prefix> or" + + " <namespace_prefix>/<namespace_value>"); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 9004110..1fc6bad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -23,8 +23,6 @@ import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; @@ -334,25 +332,6 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode> targetAllocationTags = new HashSet<>( targetExpression.getTargetValues()); - - try { - AllocationTagNamespace tagNS = - AllocationTagNamespace.parse(targetExpression.getTargetKey()); - if (AllocationTagNamespaceType.APP_LABEL - .equals(tagNS.getNamespaceType())) { - throwExceptionWithMetaInfo( - "As of now, allocation tag namespace [" - + AllocationTagNamespaceType.APP_LABEL.toString() - + "] is not supported. Please make changes to placement " - + "constraints accordingly. If this is null, it will be " - + "set to " - + AllocationTagNamespaceType.SELF.toString() - + " by default."); - } - } catch (InvalidAllocationTagsQueryException e) { - throwExceptionWithMetaInfo( - "Invalid allocation tag namespace, message: " + e.getMessage()); - } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index c399368..664fae2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -68,6 +68,7 @@ public class MockRMApp implements RMApp { RMAppAttempt attempt; int maxAppAttempts = 1; List<ResourceRequest> amReqs; + private Set<String> applicationTags = null; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -82,6 +83,12 @@ public class MockRMApp implements RMApp { user = userName; } + public MockRMApp(int newid, long time, RMAppState newState, + String userName, Set<String> appTags) { + this(newid, time, newState, userName); + this.applicationTags = appTags; + } + public MockRMApp(int newid, long time, RMAppState newState, String userName, String diag) { this(newid, time, newState, userName); this.diagnostics = new StringBuilder(diag); @@ -248,7 +255,7 @@ public class MockRMApp implements RMApp { @Override public Set<String> getApplicationTags() { - return null; + return this.applicationTags; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java index d7124bb..13247a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java @@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceSizing; @@ -294,7 +294,8 @@ public class TestSchedulingRequestContainerAllocation { // App2 asks for 3 containers that anti-affinity with any mapper, // since 3 out of 4 nodes already have mapper containers, all 3 // containers will be allocated on the other node. - AllocationTagNamespace.All allNs = new AllocationTagNamespace.All(); + TargetApplicationsNamespace.All allNs = + new TargetApplicationsNamespace.All(); am2.allocateAppAntiAffinity( ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)), Priority.newInstance(1), 1L, allNs.toString(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java index cbf5968..3f2aaed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java @@ -27,13 +27,19 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Test functionality of AllocationTagsManager. @@ -468,15 +474,27 @@ public class TestAllocationTagsManager { @Test public void testNodeAllocationTagsAggregation() throws InvalidAllocationTagsQueryException { + RMContext mockContext = Mockito.spy(rmContext); - AllocationTagsManager atm = new AllocationTagsManager(rmContext); ApplicationId app1 = TestUtils.getMockApplicationId(1); ApplicationId app2 = TestUtils.getMockApplicationId(2); ApplicationId app3 = TestUtils.getMockApplicationId(3); + NodeId host1 = NodeId.fromString("host1:123"); NodeId host2 = NodeId.fromString("host2:123"); NodeId host3 = NodeId.fromString("host3:123"); + ConcurrentMap<ApplicationId, RMApp> allApps = new ConcurrentHashMap<>(); + allApps.put(app1, new MockRMApp(123, 1000, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + allApps.put(app2, new MockRMApp(124, 1001, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + allApps.put(app3, new MockRMApp(125, 1002, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + Mockito.when(mockContext.getRMApps()).thenReturn(allApps); + + AllocationTagsManager atm = new AllocationTagsManager(mockContext); + /** * Node1 (rack0) * app1/A(2) @@ -561,7 +579,7 @@ public class TestAllocationTagsManager { * */ tags = AllocationTags.createOtherAppAllocationTags(app1, - ImmutableSet.of(app1, app2, app3), ImmutableSet.of("A", "B")); + ImmutableSet.of("A", "B")); Assert.assertEquals(4, atm.getNodeCardinalityByOp(host1, tags, Long::max)); Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java index d1ef331..27a121a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java @@ -16,50 +16,67 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; /** * limitations under the License. */ import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + /** - * Test class for {@link AllocationTagNamespace}. + * Test class for {@link TargetApplicationsNamespace}. */ public class TestAllocationTagsNamespace { @Test public void testNamespaceParse() throws InvalidAllocationTagsQueryException { - AllocationTagNamespace namespace; + TargetApplicationsNamespace namespace; String namespaceStr = "self"; - namespace = AllocationTagNamespace.parse(namespaceStr); + namespace = TargetApplicationsNamespace.parse(namespaceStr); Assert.assertEquals(AllocationTagNamespaceType.SELF, namespace.getNamespaceType()); namespaceStr = "not-self"; - namespace = AllocationTagNamespace.parse(namespaceStr); + namespace = TargetApplicationsNamespace.parse(namespaceStr); Assert.assertEquals(AllocationTagNamespaceType.NOT_SELF, namespace.getNamespaceType()); namespaceStr = "all"; - namespace = AllocationTagNamespace.parse(namespaceStr); + namespace = TargetApplicationsNamespace.parse(namespaceStr); Assert.assertEquals(AllocationTagNamespaceType.ALL, namespace.getNamespaceType()); - namespaceStr = "app-label"; - namespace = AllocationTagNamespace.parse(namespaceStr); - Assert.assertEquals(AllocationTagNamespaceType.APP_LABEL, + namespaceStr = "app-tag/spark-jobs"; + namespace = TargetApplicationsNamespace.parse(namespaceStr); + Assert.assertEquals(AllocationTagNamespaceType.APP_TAG, namespace.getNamespaceType()); + // Invalid app-tag namespace syntax + try { + namespaceStr = "app-tag/tag123/tag234"; + TargetApplicationsNamespace.parse(namespaceStr); + Assert.fail("Parsing should fail as the given namespace is invalid"); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException); + Assert.assertTrue(e.getMessage().startsWith( + "Invalid namespace string")); + } + ApplicationId applicationId = ApplicationId.newInstance(12345, 1); namespaceStr = "app-id/" + applicationId.toString(); - namespace = AllocationTagNamespace.parse(namespaceStr); + namespace = TargetApplicationsNamespace.parse(namespaceStr); Assert.assertEquals(AllocationTagNamespaceType.APP_ID, namespace.getNamespaceType()); // Invalid app-id namespace syntax, invalid app ID. try { namespaceStr = "app-id/apppppp_12345_99999"; - AllocationTagNamespace.parse(namespaceStr); + TargetApplicationsNamespace.parse(namespaceStr); Assert.fail("Parsing should fail as the given app ID is invalid"); } catch (Exception e) { Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException); @@ -70,7 +87,7 @@ public class TestAllocationTagsNamespace { // Invalid app-id namespace syntax, missing app ID. try { namespaceStr = "app-id"; - AllocationTagNamespace.parse(namespaceStr); + TargetApplicationsNamespace.parse(namespaceStr); Assert.fail("Parsing should fail as the given namespace" + " is missing application ID"); } catch (Exception e) { @@ -82,7 +99,7 @@ public class TestAllocationTagsNamespace { // Invalid namespace type. try { namespaceStr = "non_exist_ns"; - AllocationTagNamespace.parse(namespaceStr); + TargetApplicationsNamespace.parse(namespaceStr); Assert.fail("Parsing should fail as the giving type is not supported."); } catch (Exception e) { Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException); @@ -94,7 +111,7 @@ public class TestAllocationTagsNamespace { @Test public void testNamespaceEvaluation() throws InvalidAllocationTagsQueryException { - AllocationTagNamespace namespace; + TargetApplicationsNamespace namespace; TargetApplications targetApplications; ApplicationId app1 = ApplicationId.newInstance(10000, 1); ApplicationId app2 = ApplicationId.newInstance(10000, 2); @@ -104,7 +121,7 @@ public class TestAllocationTagsNamespace { // Ensure eval is called before using the scope. String namespaceStr = "self"; - namespace = AllocationTagNamespace.parse(namespaceStr); + namespace = TargetApplicationsNamespace.parse(namespaceStr); try { namespace.getNamespaceScope(); Assert.fail("Call getNamespaceScope before evaluate is not allowed."); @@ -115,14 +132,14 @@ public class TestAllocationTagsNamespace { } namespaceStr = "self"; - namespace = AllocationTagNamespace.parse(namespaceStr); + namespace = TargetApplicationsNamespace.parse(namespaceStr); targetApplications = new TargetApplications(app1, ImmutableSet.of(app1)); namespace.evaluate(targetApplications); Assert.assertEquals(1, namespace.getNamespaceScope().size()); Assert.assertEquals(app1, namespace.getNamespaceScope().iterator().next()); namespaceStr = "not-self"; - namespace = AllocationTagNamespace.parse(namespaceStr); + namespace = TargetApplicationsNamespace.parse(namespaceStr); targetApplications = new TargetApplications(app1, ImmutableSet.of(app1)); namespace.evaluate(targetApplications); Assert.assertEquals(0, namespace.getNamespaceScope().size()); @@ -134,16 +151,52 @@ public class TestAllocationTagsNamespace { Assert.assertFalse(namespace.getNamespaceScope().contains(app1)); namespaceStr = "all"; - namespace = AllocationTagNamespace.parse(namespaceStr); + namespace = TargetApplicationsNamespace.parse(namespaceStr); Assert.assertEquals(AllocationTagNamespaceType.ALL, namespace.getNamespaceType()); namespaceStr = "app-id/" + app2.toString(); - namespace = AllocationTagNamespace.parse(namespaceStr); + namespace = TargetApplicationsNamespace.parse(namespaceStr); targetApplications = new TargetApplications(app1, ImmutableSet.of(app1, app2, app3, app4, app5)); namespace.evaluate(targetApplications); Assert.assertEquals(1, namespace.getNamespaceScope().size()); Assert.assertEquals(app2, namespace.getNamespaceScope().iterator().next()); + + /** + * App to Application Tags + * app1: A, B + * app2: A + * app3: + * app4: C + * app5: A, B, C + */ + Map<ApplicationId, Set<String>> appsWithTags = new HashMap<>(); + appsWithTags.put(app1, ImmutableSet.of("A", "B")); + appsWithTags.put(app2, ImmutableSet.of("A")); + appsWithTags.put(app3, ImmutableSet.of()); + appsWithTags.put(app4, ImmutableSet.of("C")); + appsWithTags.put(app5, ImmutableSet.of("A", "B", "C")); + + namespaceStr = "app-tag/A"; + namespace = TargetApplicationsNamespace.parse(namespaceStr); + targetApplications = new TargetApplications(app1, appsWithTags); + namespace.evaluate(targetApplications); + Assert.assertEquals(3, namespace.getNamespaceScope().size()); + Assert.assertTrue(Sets.difference(namespace.getNamespaceScope(), + ImmutableSet.of(app1, app2, app5)).isEmpty()); + + namespaceStr = "app-tag/B"; + namespace = TargetApplicationsNamespace.parse(namespaceStr); + namespace.evaluate(targetApplications); + Assert.assertEquals(2, namespace.getNamespaceScope().size()); + Assert.assertTrue(Sets.difference(namespace.getNamespaceScope(), + ImmutableSet.of(app1, app5)).isEmpty()); + + // Not exist + namespaceStr = "app-tag/xyz"; + namespace = TargetApplicationsNamespace.parse(namespaceStr); + namespace.evaluate(targetApplications); + Assert.assertEquals(0, namespace.getNamespaceScope().size()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/7853ec8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java index 4814321..3248450 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -37,6 +37,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicLong; @@ -52,6 +54,9 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -63,6 +68,7 @@ import org.junit.Before; import org.junit.Test; import com.google.common.collect.ImmutableSet; +import org.mockito.Mockito; /** * Test the PlacementConstraint Utility class functionality. @@ -562,8 +568,8 @@ public class TestPlacementConstraintsUtil { SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(), n3r2.getRackName(), n3r2.getNodeID()); - AllocationTagNamespace namespaceAll = - new AllocationTagNamespace.All(); + TargetApplicationsNamespace namespaceAll = + new TargetApplicationsNamespace.All(); //*************************** // 1) all, anti-affinity @@ -648,17 +654,28 @@ public class TestPlacementConstraintsUtil { @Test public void testNotSelfAppConstraints() throws InvalidAllocationTagsQueryException { - AllocationTagsManager tm = new AllocationTagsManager(rmContext); - PlacementConstraintManagerService pcm = - new MemoryPlacementConstraintManager(); - rmContext.setAllocationTagsManager(tm); - rmContext.setPlacementConstraintManager(pcm); - long ts = System.currentTimeMillis(); ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100); ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101); ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102); + ConcurrentMap<ApplicationId, RMApp> allApps = new ConcurrentHashMap<>(); + allApps.put(application1, new MockRMApp(123, 1000, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + allApps.put(application2, new MockRMApp(124, 1001, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + allApps.put(application3, new MockRMApp(125, 1002, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + + RMContext mockedContext = Mockito.spy(rmContext); + when(mockedContext.getRMApps()).thenReturn(allApps); + + AllocationTagsManager tm = new AllocationTagsManager(mockedContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + mockedContext.setAllocationTagsManager(tm); + mockedContext.setPlacementConstraintManager(pcm); + // Register App1 with anti-affinity constraint map. RMNode n0r1 = rmNodes.get(0); RMNode n1r1 = rmNodes.get(1); @@ -696,8 +713,8 @@ public class TestPlacementConstraintsUtil { SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(), n3r2.getRackName(), n3r2.getNodeID()); - AllocationTagNamespace notSelf = - new AllocationTagNamespace.NotSelf(); + TargetApplicationsNamespace notSelf = + new TargetApplicationsNamespace.NotSelf(); //*************************** // 1) not-self, app1 @@ -800,8 +817,8 @@ public class TestPlacementConstraintsUtil { SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(), n3r2.getRackName(), n3r2.getNodeID()); - AllocationTagNamespace namespace = - new AllocationTagNamespace.AppID(application1); + TargetApplicationsNamespace namespace = + new TargetApplicationsNamespace.AppID(application1); Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>(); PlacementConstraint constraint2 = PlacementConstraints .targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(), @@ -832,7 +849,7 @@ public class TestPlacementConstraintsUtil { // Intra-app constraint // Test with default and empty namespace - AllocationTagNamespace self = new AllocationTagNamespace.Self(); + TargetApplicationsNamespace self = new TargetApplicationsNamespace.Self(); PlacementConstraint constraint3 = PlacementConstraints .targetNotIn(NODE, allocationTagWithNamespace(self.toString(), "hbase-m")) @@ -873,6 +890,88 @@ public class TestPlacementConstraintsUtil { } @Test + public void testInterAppConstriantsByAppTag() + throws InvalidAllocationTagsQueryException { + + ApplicationId application1 = BuilderUtils.newApplicationId(1000, 123); + ApplicationId application2 = BuilderUtils.newApplicationId(1001, 124); + + // app1: test-tag + // app2: N/A + RMContext mockedContext = Mockito.spy(rmContext); + ConcurrentMap<ApplicationId, RMApp> allApps = new ConcurrentHashMap<>(); + allApps.put(application1, new MockRMApp(123, 1000, + RMAppState.NEW, "userA", ImmutableSet.of("test-tag"))); + allApps.put(application2, new MockRMApp(124, 1001, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + when(mockedContext.getRMApps()).thenReturn(allApps); + + AllocationTagsManager tm = new AllocationTagsManager(mockedContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + mockedContext.setAllocationTagsManager(tm); + mockedContext.setPlacementConstraintManager(pcm); + + // Register App1 with anti-affinity constraint map. + RMNode n0r1 = rmNodes.get(0); + RMNode n1r1 = rmNodes.get(1); + RMNode n2r2 = rmNodes.get(2); + RMNode n3r2 = rmNodes.get(3); + + /** + * Place container: + * n0: app1/hbase-m(1) + * n1: "" + * n2: app1/hbase-m(1) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(application1), ImmutableSet.of("hbase-m")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(application1), ImmutableSet.of("hbase-m")); + + SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(), + n1r1.getRackName(), n1r1.getNodeID()); + SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(), + n2r2.getRackName(), n2r2.getNodeID()); + SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(), + n3r2.getRackName(), n3r2.getNodeID()); + + TargetApplicationsNamespace namespace = + new TargetApplicationsNamespace.AppTag("test-tag"); + Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>(); + PlacementConstraint constraint2 = PlacementConstraints + .targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(), + "hbase-m")) + .build(); + Set<String> srcTags2 = ImmutableSet.of("app2"); + constraintMap.put(srcTags2, constraint2); + + pcm.registerApplication(application2, constraintMap); + + // Anti-affinity with app-tag/test-tag/hbase-m, + // app1 has tag "test-tag" so the constraint is equally to work on app1 + // onto n1 and n3 as they don't have "hbase-m" from app1. + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode3, pcm, tm)); + + pcm.unregisterApplication(application1); + pcm.unregisterApplication(application2); + } + + @Test public void testInvalidAllocationTagNamespace() { AllocationTagsManager tm = new AllocationTagsManager(rmContext); PlacementConstraintManagerService pcm = --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org