YARN-7707. [GPG] Policy generator framework. Contributed by Young Chen
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6800cf70 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6800cf70 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6800cf70 Branch: refs/heads/YARN-7402 Commit: 6800cf7015d81cc0085ad0f9159e246842e72187 Parents: f833e1b Author: Botong Huang <bot...@apache.org> Authored: Fri Mar 23 17:07:10 2018 -0700 Committer: Botong Huang <bot...@apache.org> Committed: Thu Aug 2 09:59:48 2018 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 36 +- .../src/main/resources/yarn-default.xml | 40 +++ .../utils/FederationStateStoreFacade.java | 13 + .../pom.xml | 18 + .../globalpolicygenerator/GPGContext.java | 4 + .../globalpolicygenerator/GPGContextImpl.java | 10 + .../globalpolicygenerator/GPGPolicyFacade.java | 220 ++++++++++++ .../server/globalpolicygenerator/GPGUtils.java | 80 +++++ .../GlobalPolicyGenerator.java | 17 + .../policygenerator/GlobalPolicy.java | 76 +++++ .../policygenerator/NoOpGlobalPolicy.java | 36 ++ .../policygenerator/PolicyGenerator.java | 261 ++++++++++++++ .../UniformWeightedLocalityGlobalPolicy.java | 71 ++++ .../policygenerator/package-info.java | 24 ++ .../TestGPGPolicyFacade.java | 202 +++++++++++ .../policygenerator/TestPolicyGenerator.java | 338 +++++++++++++++++++ .../src/test/resources/schedulerInfo1.json | 134 ++++++++ .../src/test/resources/schedulerInfo2.json | 196 +++++++++++ 18 files changed, 1775 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ec88411..61535fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3342,7 +3342,7 @@ public class YarnConfiguration extends Configuration { public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED = false; - private static final String FEDERATION_GPG_PREFIX = + public static final String FEDERATION_GPG_PREFIX = FEDERATION_PREFIX + "gpg."; // The number of threads to use for the GPG scheduled executor service @@ -3360,6 +3360,40 @@ public class YarnConfiguration extends Configuration { FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms"; public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000; + public static final String FEDERATION_GPG_POLICY_PREFIX = + FEDERATION_GPG_PREFIX + "policy.generator."; + + /** The interval at which the policy generator runs, default is one hour. */ + public static final String GPG_POLICY_GENERATOR_INTERVAL_MS = + FEDERATION_GPG_POLICY_PREFIX + "interval-ms"; + public static final long DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS = -1; + + /** + * The configured policy generator class, runs NoOpGlobalPolicy by + * default. + */ + public static final String GPG_GLOBAL_POLICY_CLASS = + FEDERATION_GPG_POLICY_PREFIX + "class"; + public static final String DEFAULT_GPG_GLOBAL_POLICY_CLASS = + "org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator." + + "NoOpGlobalPolicy"; + + /** + * Whether or not the policy generator is running in read only (won't modify + * policies), default is false. + */ + public static final String GPG_POLICY_GENERATOR_READONLY = + FEDERATION_GPG_POLICY_PREFIX + "readonly"; + public static final boolean DEFAULT_GPG_POLICY_GENERATOR_READONLY = + false; + + /** + * Which sub-clusters the policy generator should blacklist. + */ + public static final String GPG_POLICY_GENERATOR_BLACKLIST = + FEDERATION_GPG_POLICY_PREFIX + "blacklist"; + + //////////////////////////////// // Other Configs //////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 66493f3..755f3e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3557,6 +3557,46 @@ <property> <description> + The interval at which the policy generator runs, default is one hour + </description> + <name>yarn.federation.gpg.policy.generator.interval-ms</name> + <value>3600000</value> + </property> + + <property> + <description> + The configured policy generator class, runs NoOpGlobalPolicy by default + </description> + <name>yarn.federation.gpg.policy.generator.class</name> + <value>org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.NoOpGlobalPolicy</value> + </property> + + <property> + <description> + Whether or not the policy generator is running in read only (won't modify policies), default is false + </description> + <name>yarn.federation.gpg.policy.generator.readonly</name> + <value>false</value> + </property> + + <property> + <description> + Whether or not the policy generator is running in read only (won't modify policies), default is false + </description> + <name>yarn.federation.gpg.policy.generator.readonly</name> + <value>false</value> + </property> + + <property> + <description> + Which subclusters the gpg should blacklist, default is none + </description> + <name>yarn.federation.gpg.policy.generator.blacklist</name> + <value></value> + </property> + + <property> + <description> It is TimelineClient 1.5 configuration whether to store active applicationâs timeline data with in user directory i.e ${yarn.timeline-service.entity-group-fs-store.active-dir}/${user.name} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 4c3bed0..25a9e52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -373,6 +374,18 @@ public final class FederationStateStoreFacade { } /** + * Set a policy configuration into the state store. + * + * @param policyConf the policy configuration to set + * @throws YarnException if the request is invalid/fails + */ + public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf) + throws YarnException { + stateStore.setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest.newInstance(policyConf)); + } + + /** * Adds the home {@link SubClusterId} for the specified {@link ApplicationId}. * * @param appHomeSubCluster the mapping of the application to it's home http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml index 9bbb936..9398b0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml @@ -63,6 +63,12 @@ <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-resourcemanager</artifactId> </dependency> @@ -73,6 +79,12 @@ </dependency> <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-common</artifactId> <type>test-jar</type> @@ -92,6 +104,12 @@ <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/schedulerInfo1.json</exclude> + <exclude>src/test/resources/schedulerInfo2.json</exclude> + </excludes> + </configuration> </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java index da8a383..6b0a5a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java @@ -28,4 +28,8 @@ public interface GPGContext { FederationStateStoreFacade getStateStoreFacade(); void setStateStoreFacade(FederationStateStoreFacade facade); + + GPGPolicyFacade getPolicyFacade(); + + void setPolicyFacade(GPGPolicyFacade facade); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java index 3884ace..bb49844 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade public class GPGContextImpl implements GPGContext { private FederationStateStoreFacade facade; + private GPGPolicyFacade policyFacade; @Override public FederationStateStoreFacade getStateStoreFacade() { @@ -38,4 +39,13 @@ public class GPGContextImpl implements GPGContext { this.facade = federationStateStoreFacade; } + @Override + public GPGPolicyFacade getPolicyFacade(){ + return policyFacade; + } + + @Override + public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){ + policyFacade = gpgPolicyfacade; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java new file mode 100644 index 0000000..4c61a14 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * A utility class for the GPG Policy Generator to read and write policies + * into the FederationStateStore. Policy specific logic is abstracted away in + * this class, so the PolicyGenerator can avoid dealing with policy + * construction, reinitialization, and serialization. + * + * There are only two exposed methods: + * + * {@link #getPolicyManager(String)} + * Gets the PolicyManager via queue name. Null if there is no policy + * configured for the specified queue. The PolicyManager can be used to + * extract the {@link FederationRouterPolicy} and + * {@link FederationAMRMProxyPolicy}, as well as any policy specific parameters + * + * {@link #setPolicyManager(FederationPolicyManager)} + * Sets the PolicyManager. If the policy configuration is the same, no change + * occurs. Otherwise, the internal cache is updated and the new configuration + * is written into the FederationStateStore + * + * This class assumes that the GPG is the only service + * writing policies. Thus, the only FederationStateStore reads occur the first + * time a queue policy is retrieved - after that, the GPG only writes to the + * FederationStateStore. + * + * The class uses a PolicyManager cache and a SubClusterPolicyConfiguration + * cache. The primary use for these caches are to serve reads, and to + * identify when the PolicyGenerator has actually changed the policy + * so unnecessary FederationStateStore policy writes can be avoided. + */ + +public class GPGPolicyFacade { + + private static final Logger LOG = + LoggerFactory.getLogger(GPGPolicyFacade.class); + + private FederationStateStoreFacade stateStore; + + private Map<String, FederationPolicyManager> policyManagerMap; + private Map<String, SubClusterPolicyConfiguration> policyConfMap; + + private boolean readOnly; + + public GPGPolicyFacade(FederationStateStoreFacade stateStore, + Configuration conf) { + this.stateStore = stateStore; + this.policyManagerMap = new HashMap<>(); + this.policyConfMap = new HashMap<>(); + this.readOnly = + conf.getBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY, + YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_READONLY); + } + + /** + * Provides a utility for the policy generator to read the policy manager + * from the FederationStateStore. Because the policy generator should be the + * only component updating the policy, this implementation does not use the + * reinitialization feature. + * + * @param queueName the name of the queue we want the policy manager for. + * @return the policy manager responsible for the queue policy. + */ + public FederationPolicyManager getPolicyManager(String queueName) + throws YarnException { + FederationPolicyManager policyManager = policyManagerMap.get(queueName); + // If we don't have the policy manager cached, pull configuration + // from the FederationStateStore to create and cache it + if (policyManager == null) { + try { + // If we don't have the configuration cached, pull it + // from the stateStore + SubClusterPolicyConfiguration conf = policyConfMap.get(queueName); + if (conf == null) { + conf = stateStore.getPolicyConfiguration(queueName); + } + // If configuration is still null, it does not exist in the + // FederationStateStore + if (conf == null) { + LOG.info("Read null policy for queue {}", queueName); + return null; + } + policyManager = + FederationPolicyUtils.instantiatePolicyManager(conf.getType()); + policyManager.setQueue(queueName); + + // TODO there is currently no way to cleanly deserialize a policy + // manager sub type from just the configuration + if (policyManager instanceof WeightedLocalityPolicyManager) { + WeightedPolicyInfo wpinfo = + WeightedPolicyInfo.fromByteBuffer(conf.getParams()); + WeightedLocalityPolicyManager wlpmanager = + (WeightedLocalityPolicyManager) policyManager; + LOG.info("Updating policy for queue {} to configured weights router: " + + "{}, amrmproxy: {}", queueName, + wpinfo.getRouterPolicyWeights(), + wpinfo.getAMRMPolicyWeights()); + wlpmanager.setWeightedPolicyInfo(wpinfo); + } else { + LOG.warn("Warning: FederationPolicyManager of unsupported type {}, " + + "initialization may be incomplete ", policyManager.getClass()); + } + + policyManagerMap.put(queueName, policyManager); + policyConfMap.put(queueName, conf); + } catch (YarnException e) { + LOG.error("Error reading SubClusterPolicyConfiguration from state " + + "store for queue: {}", queueName); + throw e; + } + } + return policyManager; + } + + /** + * Provides a utility for the policy generator to write a policy manager + * into the FederationStateStore. The facade keeps a cache and will only write + * into the FederationStateStore if the policy configuration has changed. + * + * @param policyManager The policy manager we want to update into the state + * store. It contains policy information as well as + * the queue name we will update for. + */ + public void setPolicyManager(FederationPolicyManager policyManager) + throws YarnException { + if (policyManager == null) { + LOG.warn("Attempting to set null policy manager"); + return; + } + // Extract the configuration from the policy manager + String queue = policyManager.getQueue(); + SubClusterPolicyConfiguration conf; + try { + conf = policyManager.serializeConf(); + } catch (FederationPolicyInitializationException e) { + LOG.warn("Error serializing policy for queue {}", queue); + throw e; + } + if (conf == null) { + // State store does not currently support setting a policy back to null + // because it reads the queue name to set from the policy! + LOG.warn("Skip setting policy to null for queue {} into state store", + queue); + return; + } + // Compare with configuration cache, if different, write the conf into + // store and update our conf and manager cache + if (!confCacheEqual(queue, conf)) { + try { + if (readOnly) { + LOG.info("[read-only] Skipping policy update for queue {}", queue); + return; + } + LOG.info("Updating policy for queue {} into state store", queue); + stateStore.setPolicyConfiguration(conf); + policyConfMap.put(queue, conf); + policyManagerMap.put(queue, policyManager); + } catch (YarnException e) { + LOG.warn("Error writing SubClusterPolicyConfiguration to state " + + "store for queue: {}", queue); + throw e; + } + } else { + LOG.info("Setting unchanged policy - state store write skipped"); + } + } + + /** + * @param queue the queue to check the cached policy configuration for + * @param conf the new policy configuration + * @return whether or not the conf is equal to the cached conf + */ + private boolean confCacheEqual(String queue, + SubClusterPolicyConfiguration conf) { + SubClusterPolicyConfiguration cachedConf = policyConfMap.get(queue); + if (conf == null && cachedConf == null) { + return true; + } else if (conf != null && cachedConf != null) { + if (conf.equals(cachedConf)) { + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java new file mode 100644 index 0000000..429bec4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.MediaType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; + +/** + * GPGUtils contains utility functions for the GPG. + * + */ +public final class GPGUtils { + + // hide constructor + private GPGUtils() { + } + + /** + * Performs an invocation of the the remote RMWebService. + */ + public static <T> T invokeRMWebService(Configuration conf, String webAddr, + String path, final Class<T> returnType) { + Client client = Client.create(); + T obj = null; + + WebResource webResource = client.resource(webAddr); + ClientResponse response = webResource.path("ws/v1/cluster").path(path) + .accept(MediaType.APPLICATION_XML).get(ClientResponse.class); + if (response.getStatus() == HttpServletResponse.SC_OK) { + obj = response.getEntity(returnType); + } else { + throw new YarnRuntimeException("Bad response from remote web service: " + + response.getStatus()); + } + return obj; + } + + /** + * Creates a uniform weighting of 1.0 for each sub cluster. + */ + public static Map<SubClusterIdInfo, Float> createUniformWeights( + Set<SubClusterId> ids) { + Map<SubClusterIdInfo, Float> weights = + new HashMap<>(); + for(SubClusterId id : ids) { + weights.put(new SubClusterIdInfo(id), 1.0f); + } + return weights; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index f6cfba0..88b9f2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -31,6 +31,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator; import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,7 @@ public class GlobalPolicyGenerator extends CompositeService { // Scheduler service that runs tasks periodically private ScheduledThreadPoolExecutor scheduledExecutorService; private SubClusterCleaner subClusterCleaner; + private PolicyGenerator policyGenerator; public GlobalPolicyGenerator() { super(GlobalPolicyGenerator.class.getName()); @@ -73,11 +75,15 @@ public class GlobalPolicyGenerator extends CompositeService { // Set up the context this.gpgContext .setStateStoreFacade(FederationStateStoreFacade.getInstance()); + this.gpgContext + .setPolicyFacade(new GPGPolicyFacade( + this.gpgContext.getStateStoreFacade(), conf)); this.scheduledExecutorService = new ScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext); + this.policyGenerator = new PolicyGenerator(conf, this.gpgContext); DefaultMetricsSystem.initialize(METRICS_NAME); @@ -99,6 +105,17 @@ public class GlobalPolicyGenerator extends CompositeService { LOG.info("Scheduled sub-cluster cleaner with interval: {}", DurationFormatUtils.formatDurationISO(scCleanerIntervalMs)); } + + // Schedule PolicyGenerator + long policyGeneratorIntervalMillis = getConfig().getLong( + YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS, + YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS); + if(policyGeneratorIntervalMillis > 0){ + this.scheduledExecutorService.scheduleAtFixedRate(this.policyGenerator, + 0, policyGeneratorIntervalMillis, TimeUnit.MILLISECONDS); + LOG.info("Scheduled policygenerator with interval: {}", + DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis)); + } } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java new file mode 100644 index 0000000..38d762d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; + +import java.util.Collections; +import java.util.Map; + +/** + * This interface defines the plug-able policy that the PolicyGenerator uses + * to update policies into the state store. + */ + +public abstract class GlobalPolicy implements Configurable { + + private Configuration conf; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * Return a map of the object type and RM path to request it from - the + * framework will query these paths and provide the objects to the policy. + * Delegating this responsibility to the PolicyGenerator enables us to avoid + * duplicate calls to the same * endpoints as the GlobalPolicy is invoked + * once per queue. + */ + protected Map<Class, String> registerPaths() { + // Default register nothing + return Collections.emptyMap(); + } + + /** + * Given a queue, cluster metrics, and policy manager, update the policy + * to account for the cluster status. This method defines the policy generator + * behavior. + * + * @param queueName name of the queue + * @param clusterInfo subClusterId map to cluster information about the + * SubCluster used to make policy decisions + * @param manager the FederationPolicyManager for the queue's existing + * policy the manager may be null, in which case the policy + * will need to be created + * @return policy manager that handles the updated (or created) policy + */ + protected abstract FederationPolicyManager updatePolicy(String queueName, + Map<SubClusterId, Map<Class, Object>> clusterInfo, + FederationPolicyManager manager); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java new file mode 100644 index 0000000..c2d578f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; + +import java.util.Map; + +/** + * Default policy that does not update any policy configurations. + */ +public class NoOpGlobalPolicy extends GlobalPolicy{ + + @Override + public FederationPolicyManager updatePolicy(String queueName, + Map<SubClusterId, Map<Class, Object>> clusterInfo, + FederationPolicyManager manager) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java new file mode 100644 index 0000000..5681ff0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * The PolicyGenerator runs periodically and updates the policy configuration + * for each queue into the FederationStateStore. The policy update behavior is + * defined by the GlobalPolicy instance that is used. + */ + +public class PolicyGenerator implements Runnable, Configurable { + + private static final Logger LOG = + LoggerFactory.getLogger(PolicyGenerator.class); + + private GPGContext gpgContext; + private Configuration conf; + + // Information request map + private Map<Class, String> pathMap = new HashMap<>(); + + // Global policy instance + @VisibleForTesting + protected GlobalPolicy policy; + + /** + * The PolicyGenerator periodically reads SubCluster load and updates + * policies into the FederationStateStore. + */ + public PolicyGenerator(Configuration conf, GPGContext context) { + setConf(conf); + init(context); + } + + private void init(GPGContext context) { + this.gpgContext = context; + LOG.info("Initialized PolicyGenerator"); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.policy = FederationStateStoreFacade + .createInstance(conf, YarnConfiguration.GPG_GLOBAL_POLICY_CLASS, + YarnConfiguration.DEFAULT_GPG_GLOBAL_POLICY_CLASS, + GlobalPolicy.class); + policy.setConf(conf); + pathMap.putAll(policy.registerPaths()); + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public final void run() { + Map<SubClusterId, SubClusterInfo> activeSubClusters; + try { + activeSubClusters = gpgContext.getStateStoreFacade().getSubClusters(true); + } catch (YarnException e) { + LOG.error("Error retrieving active sub-clusters", e); + return; + } + + // Parse the scheduler information from all the SCs + Map<SubClusterId, SchedulerInfo> schedInfo = + getSchedulerInfo(activeSubClusters); + + // Extract and enforce that all the schedulers have matching type + Set<String> queueNames = extractQueues(schedInfo); + + // Remove black listed SubClusters + activeSubClusters.keySet().removeAll(getBlackList()); + LOG.info("Active non-blacklist sub-clusters: {}", + activeSubClusters.keySet()); + + // Get cluster metrics information from non black listed RMs - later used + // to evaluate SubCluster load + Map<SubClusterId, Map<Class, Object>> clusterInfo = + getInfos(activeSubClusters); + + // Update into the FederationStateStore + for (String queueName : queueNames) { + // Retrieve the manager from the policy facade + FederationPolicyManager manager; + try { + manager = this.gpgContext.getPolicyFacade().getPolicyManager(queueName); + } catch (YarnException e) { + LOG.error("GetPolicy for queue {} failed", queueName, e); + continue; + } + LOG.info("Updating policy for queue {}", queueName); + manager = policy.updatePolicy(queueName, clusterInfo, manager); + try { + this.gpgContext.getPolicyFacade().setPolicyManager(manager); + } catch (YarnException e) { + LOG.error("SetPolicy for queue {} failed", queueName, e); + } + } + } + + /** + * Helper to retrieve metrics from the RM REST endpoints. + * + * @param activeSubClusters A map of active SubCluster IDs to info + */ + @VisibleForTesting + protected Map<SubClusterId, Map<Class, Object>> getInfos( + Map<SubClusterId, SubClusterInfo> activeSubClusters) { + + Map<SubClusterId, Map<Class, Object>> clusterInfo = new HashMap<>(); + for (SubClusterInfo sci : activeSubClusters.values()) { + for (Map.Entry<Class, String> e : this.pathMap.entrySet()) { + if (!clusterInfo.containsKey(sci.getSubClusterId())) { + clusterInfo.put(sci.getSubClusterId(), new HashMap<Class, Object>()); + } + Object ret = GPGUtils + .invokeRMWebService(conf, sci.getRMWebServiceAddress(), + e.getValue(), e.getKey()); + clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret); + } + } + + return clusterInfo; + } + + /** + * Helper to retrieve SchedulerInfos. + * + * @param activeSubClusters A map of active SubCluster IDs to info + */ + @VisibleForTesting + protected Map<SubClusterId, SchedulerInfo> getSchedulerInfo( + Map<SubClusterId, SubClusterInfo> activeSubClusters) { + Map<SubClusterId, SchedulerInfo> schedInfo = + new HashMap<>(); + for (SubClusterInfo sci : activeSubClusters.values()) { + SchedulerTypeInfo sti = GPGUtils + .invokeRMWebService(conf, sci.getRMWebServiceAddress(), + RMWSConsts.SCHEDULER, SchedulerTypeInfo.class); + if(sti != null){ + schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo()); + } else { + LOG.warn("Skipped null scheduler info from SubCluster " + sci + .getSubClusterId().toString()); + } + } + return schedInfo; + } + + /** + * Helper to get a set of blacklisted SubCluster Ids from configuration. + */ + private Set<SubClusterId> getBlackList() { + String blackListParam = + conf.get(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST); + if(blackListParam == null){ + return Collections.emptySet(); + } + Set<SubClusterId> blackList = new HashSet<>(); + for (String id : blackListParam.split(",")) { + blackList.add(SubClusterId.newInstance(id)); + } + return blackList; + } + + /** + * Given the scheduler information for all RMs, extract the union of + * queue names - right now we only consider instances of capacity scheduler. + * + * @param schedInfo the scheduler information + * @return a set of queue names + */ + private Set<String> extractQueues( + Map<SubClusterId, SchedulerInfo> schedInfo) { + Set<String> queueNames = new HashSet<String>(); + for (Map.Entry<SubClusterId, SchedulerInfo> entry : schedInfo.entrySet()) { + if (entry.getValue() instanceof CapacitySchedulerInfo) { + // Flatten the queue structure and get only non leaf queues + queueNames.addAll(flattenQueue((CapacitySchedulerInfo) entry.getValue()) + .get(CapacitySchedulerQueueInfo.class)); + } else { + LOG.warn("Skipping SubCluster {}, not configured with capacity " + + "scheduler", entry.getKey()); + } + } + return queueNames; + } + + // Helpers to flatten the queue structure into a multimap of + // queue type to set of queue names + private Map<Class, Set<String>> flattenQueue(CapacitySchedulerInfo csi) { + Map<Class, Set<String>> flattened = new HashMap<Class, Set<String>>(); + addOrAppend(flattened, csi.getClass(), csi.getQueueName()); + for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) { + flattenQueue(csqi, flattened); + } + return flattened; + } + + private void flattenQueue(CapacitySchedulerQueueInfo csi, + Map<Class, Set<String>> flattened) { + addOrAppend(flattened, csi.getClass(), csi.getQueueName()); + if (csi.getQueues() != null) { + for (CapacitySchedulerQueueInfo csqi : csi.getQueues() + .getQueueInfoList()) { + flattenQueue(csqi, flattened); + } + } + } + + private <K, V> void addOrAppend(Map<K, Set<V>> multimap, K key, V value) { + if (!multimap.containsKey(key)) { + multimap.put(key, new HashSet<V>()); + } + multimap.get(key).add(value); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java new file mode 100644 index 0000000..826cb02 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import org.apache.commons.math3.optim.nonlinear.vector.Weight; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Simple policy that generates and updates uniform weighted locality + * policies. + */ +public class UniformWeightedLocalityGlobalPolicy extends GlobalPolicy{ + + private static final Logger LOG = + LoggerFactory.getLogger(UniformWeightedLocalityGlobalPolicy.class); + + @Override + protected FederationPolicyManager updatePolicy(String queueName, + Map<SubClusterId, Map<Class, Object>> clusterInfo, + FederationPolicyManager currentManager){ + if(currentManager == null){ + // Set uniform weights for all SubClusters + LOG.info("Creating uniform weighted policy queue {}", queueName); + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + manager.setQueue(queueName); + Map<SubClusterIdInfo, Float> policyWeights = + GPGUtils.createUniformWeights(clusterInfo.keySet()); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights(policyWeights); + manager.getWeightedPolicyInfo().setRouterPolicyWeights(policyWeights); + currentManager = manager; + } + if(currentManager instanceof WeightedLocalityPolicyManager){ + LOG.info("Updating policy for queue {} to default weights", queueName); + WeightedLocalityPolicyManager wlpmanager = + (WeightedLocalityPolicyManager) currentManager; + wlpmanager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(clusterInfo.keySet())); + wlpmanager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(clusterInfo.keySet())); + } else { + LOG.info("Policy for queue {} is of type {}, expected {}", + queueName, currentManager.getClass(), Weight.class); + } + return currentManager; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java new file mode 100644 index 0000000..e8ff436 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Classes comprising the policy generator for the GPG. Responsibilities include + * generating and updating policies based on the cluster status. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java new file mode 100644 index 0000000..d78c11f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * Unit test for GPG Policy Facade. + */ +public class TestGPGPolicyFacade { + + private Configuration conf; + private FederationStateStore stateStore; + private FederationStateStoreFacade facade = + FederationStateStoreFacade.getInstance(); + private GPGPolicyFacade policyFacade; + + private Set<SubClusterId> subClusterIds; + + private SubClusterPolicyConfiguration testConf; + + private static final String TEST_QUEUE = "test-queue"; + + public TestGPGPolicyFacade() { + conf = new Configuration(); + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + subClusterIds = new HashSet<>(); + subClusterIds.add(SubClusterId.newInstance("sc0")); + subClusterIds.add(SubClusterId.newInstance("sc1")); + subClusterIds.add(SubClusterId.newInstance("sc2")); + } + + @Before + public void setUp() throws IOException, YarnException { + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + facade.reinitialize(stateStore, conf); + policyFacade = new GPGPolicyFacade(facade, conf); + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + // Add a test policy for test queue + manager.setQueue(TEST_QUEUE); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + manager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + testConf = manager.serializeConf(); + stateStore.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest + .newInstance(testConf)); + } + + @After + public void tearDown() throws Exception { + stateStore.close(); + stateStore = null; + } + + @Test + public void testGetPolicy() throws YarnException { + WeightedLocalityPolicyManager manager = + (WeightedLocalityPolicyManager) policyFacade + .getPolicyManager(TEST_QUEUE); + Assert.assertEquals(testConf, manager.serializeConf()); + } + + /** + * Test that new policies are written into the state store. + */ + @Test + public void testSetNewPolicy() throws YarnException { + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + manager.setQueue(TEST_QUEUE + 0); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + manager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + SubClusterPolicyConfiguration policyConf = manager.serializeConf(); + policyFacade.setPolicyManager(manager); + + manager = + (WeightedLocalityPolicyManager) policyFacade + .getPolicyManager(TEST_QUEUE + 0); + Assert.assertEquals(policyConf, manager.serializeConf()); + } + + /** + * Test that overwriting policies are updated in the state store. + */ + @Test + public void testOverwritePolicy() throws YarnException { + subClusterIds.add(SubClusterId.newInstance("sc3")); + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + manager.setQueue(TEST_QUEUE); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + manager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + SubClusterPolicyConfiguration policyConf = manager.serializeConf(); + policyFacade.setPolicyManager(manager); + + manager = + (WeightedLocalityPolicyManager) policyFacade + .getPolicyManager(TEST_QUEUE); + Assert.assertEquals(policyConf, manager.serializeConf()); + } + + /** + * Test that the write through cache works. + */ + @Test + public void testWriteCache() throws YarnException { + stateStore = mock(MemoryFederationStateStore.class); + facade.reinitialize(stateStore, conf); + when(stateStore.getPolicyConfiguration(Matchers.any( + GetSubClusterPolicyConfigurationRequest.class))).thenReturn( + GetSubClusterPolicyConfigurationResponse.newInstance(testConf)); + policyFacade = new GPGPolicyFacade(facade, conf); + + // Query once to fill the cache + FederationPolicyManager manager = policyFacade.getPolicyManager(TEST_QUEUE); + // State store should be contacted once + verify(stateStore, times(1)).getPolicyConfiguration( + Matchers.any(GetSubClusterPolicyConfigurationRequest.class)); + + // If we set the same policy, the state store should be untouched + policyFacade.setPolicyManager(manager); + verify(stateStore, times(0)).setPolicyConfiguration( + Matchers.any(SetSubClusterPolicyConfigurationRequest.class)); + } + + /** + * Test that when read only is enabled, the state store is not changed. + */ + @Test + public void testReadOnly() throws YarnException { + conf.setBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY, true); + stateStore = mock(MemoryFederationStateStore.class); + facade.reinitialize(stateStore, conf); + when(stateStore.getPolicyConfiguration(Matchers.any( + GetSubClusterPolicyConfigurationRequest.class))).thenReturn( + GetSubClusterPolicyConfigurationResponse.newInstance(testConf)); + policyFacade = new GPGPolicyFacade(facade, conf); + + // If we set a policy, the state store should be untouched + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + // Add a test policy for test queue + manager.setQueue(TEST_QUEUE); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + manager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + policyFacade.setPolicyManager(manager); + verify(stateStore, times(0)).setPolicyConfiguration( + Matchers.any(SetSubClusterPolicyConfigurationRequest.class)); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java new file mode 100644 index 0000000..9d27b3b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONUnmarshaller; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGPolicyFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import javax.xml.bind.JAXBException; +import java.io.IOException; +import java.io.StringReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit test for GPG Policy Generator. + */ +public class TestPolicyGenerator { + + private static final int NUM_SC = 3; + + private Configuration conf; + private FederationStateStore stateStore; + private FederationStateStoreFacade facade = + FederationStateStoreFacade.getInstance(); + + private List<SubClusterId> subClusterIds; + private Map<SubClusterId, SubClusterInfo> subClusterInfos; + private Map<SubClusterId, Map<Class, Object>> clusterInfos; + private Map<SubClusterId, SchedulerInfo> schedulerInfos; + + private GPGContext gpgContext; + + private PolicyGenerator policyGenerator; + + public TestPolicyGenerator() { + conf = new Configuration(); + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + + gpgContext = new GPGContextImpl(); + gpgContext.setPolicyFacade(new GPGPolicyFacade(facade, conf)); + gpgContext.setStateStoreFacade(facade); + } + + @Before + public void setUp() throws IOException, YarnException, JAXBException { + subClusterIds = new ArrayList<>(); + subClusterInfos = new HashMap<>(); + clusterInfos = new HashMap<>(); + schedulerInfos = new HashMap<>(); + + CapacitySchedulerInfo sti1 = + readJSON("src/test/resources/schedulerInfo1.json", + CapacitySchedulerInfo.class); + CapacitySchedulerInfo sti2 = + readJSON("src/test/resources/schedulerInfo2.json", + CapacitySchedulerInfo.class); + + // Set up sub clusters + for (int i = 0; i < NUM_SC; ++i) { + // Sub cluster Id + SubClusterId id = SubClusterId.newInstance("sc" + i); + subClusterIds.add(id); + + // Sub cluster info + SubClusterInfo cluster = SubClusterInfo + .newInstance(id, "amrm:" + i, "clientrm:" + i, "rmadmin:" + i, + "rmweb:" + i, SubClusterState.SC_RUNNING, 0, ""); + subClusterInfos.put(id, cluster); + + // Cluster metrics info + ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo(); + metricsInfo.setAppsPending(2000); + if (!clusterInfos.containsKey(id)) { + clusterInfos.put(id, new HashMap<Class, Object>()); + } + clusterInfos.get(id).put(ClusterMetricsInfo.class, metricsInfo); + + schedulerInfos.put(id, sti1); + } + + // Change one of the sub cluster schedulers + schedulerInfos.put(subClusterIds.get(0), sti2); + + stateStore = mock(FederationStateStore.class); + when(stateStore.getSubClusters((GetSubClustersInfoRequest) any())) + .thenReturn(GetSubClustersInfoResponse.newInstance( + new ArrayList<SubClusterInfo>(subClusterInfos.values()))); + facade.reinitialize(stateStore, conf); + } + + @After + public void tearDown() throws Exception { + stateStore.close(); + stateStore = null; + } + + private <T> T readJSON(String pathname, Class<T> classy) + throws IOException, JAXBException { + + JSONJAXBContext jc = + new JSONJAXBContext(JSONConfiguration.mapped().build(), classy); + JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller(); + String contents = new String(Files.readAllBytes(Paths.get(pathname))); + return unmarshaller.unmarshalFromJSON(new StringReader(contents), classy); + + } + + @Test + public void testPolicyGenerator() throws YarnException { + policyGenerator = new TestablePolicyGenerator(); + policyGenerator.policy = mock(GlobalPolicy.class); + policyGenerator.run(); + verify(policyGenerator.policy, times(1)) + .updatePolicy("default", clusterInfos, null); + verify(policyGenerator.policy, times(1)) + .updatePolicy("default2", clusterInfos, null); + } + + @Test + public void testBlacklist() throws YarnException { + conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST, + subClusterIds.get(0).toString()); + Map<SubClusterId, Map<Class, Object>> blacklistedCMI = + new HashMap<>(clusterInfos); + blacklistedCMI.remove(subClusterIds.get(0)); + policyGenerator = new TestablePolicyGenerator(); + policyGenerator.policy = mock(GlobalPolicy.class); + policyGenerator.run(); + verify(policyGenerator.policy, times(1)) + .updatePolicy("default", blacklistedCMI, null); + verify(policyGenerator.policy, times(0)) + .updatePolicy("default", clusterInfos, null); + } + + @Test + public void testBlacklistTwo() throws YarnException { + conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST, + subClusterIds.get(0).toString() + "," + subClusterIds.get(1) + .toString()); + Map<SubClusterId, Map<Class, Object>> blacklistedCMI = + new HashMap<>(clusterInfos); + blacklistedCMI.remove(subClusterIds.get(0)); + blacklistedCMI.remove(subClusterIds.get(1)); + policyGenerator = new TestablePolicyGenerator(); + policyGenerator.policy = mock(GlobalPolicy.class); + policyGenerator.run(); + verify(policyGenerator.policy, times(1)) + .updatePolicy("default", blacklistedCMI, null); + verify(policyGenerator.policy, times(0)) + .updatePolicy("default", clusterInfos, null); + } + + @Test + public void testExistingPolicy() throws YarnException { + WeightedLocalityPolicyManager manager = new WeightedLocalityPolicyManager(); + // Add a test policy for test queue + manager.setQueue("default"); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights(GPGUtils + .createUniformWeights(new HashSet<SubClusterId>(subClusterIds))); + manager.getWeightedPolicyInfo().setRouterPolicyWeights(GPGUtils + .createUniformWeights(new HashSet<SubClusterId>(subClusterIds))); + SubClusterPolicyConfiguration testConf = manager.serializeConf(); + when(stateStore.getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest.newInstance("default"))) + .thenReturn( + GetSubClusterPolicyConfigurationResponse.newInstance(testConf)); + + policyGenerator = new TestablePolicyGenerator(); + policyGenerator.policy = mock(GlobalPolicy.class); + policyGenerator.run(); + + ArgumentCaptor<FederationPolicyManager> argCaptor = + ArgumentCaptor.forClass(FederationPolicyManager.class); + verify(policyGenerator.policy, times(1)) + .updatePolicy(eq("default"), eq(clusterInfos), argCaptor.capture()); + assertEquals(argCaptor.getValue().getClass(), manager.getClass()); + assertEquals(argCaptor.getValue().serializeConf(), manager.serializeConf()); + } + + @Test + public void testCallRM() { + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + + final String a = CapacitySchedulerConfiguration.ROOT + ".a"; + final String b = CapacitySchedulerConfiguration.ROOT + ".b"; + final String a1 = a + ".a1"; + final String a2 = a + ".a2"; + final String b1 = b + ".b1"; + final String b2 = b + ".b2"; + final String b3 = b + ".b3"; + float aCapacity = 10.5f; + float bCapacity = 89.5f; + float a1Capacity = 30; + float a2Capacity = 70; + float b1Capacity = 79.2f; + float b2Capacity = 0.8f; + float b3Capacity = 20; + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b"}); + + csConf.setCapacity(a, aCapacity); + csConf.setCapacity(b, bCapacity); + + // Define 2nd-level queues + csConf.setQueues(a, new String[] {"a1", "a2"}); + csConf.setCapacity(a1, a1Capacity); + csConf.setUserLimitFactor(a1, 100.0f); + csConf.setCapacity(a2, a2Capacity); + csConf.setUserLimitFactor(a2, 100.0f); + + csConf.setQueues(b, new String[] {"b1", "b2", "b3"}); + csConf.setCapacity(b1, b1Capacity); + csConf.setUserLimitFactor(b1, 100.0f); + csConf.setCapacity(b2, b2Capacity); + csConf.setUserLimitFactor(b2, 100.0f); + csConf.setCapacity(b3, b3Capacity); + csConf.setUserLimitFactor(b3, 100.0f); + + YarnConfiguration rmConf = new YarnConfiguration(csConf); + + ResourceManager resourceManager = new ResourceManager(); + rmConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + resourceManager.init(rmConf); + resourceManager.start(); + + String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf); + SchedulerTypeInfo sti = GPGUtils + .invokeRMWebService(conf, rmAddress, RMWSConsts.SCHEDULER, + SchedulerTypeInfo.class); + + Assert.assertNotNull(sti); + } + + /** + * Testable policy generator overrides the methods that communicate + * with the RM REST endpoint, allowing us to inject faked responses. + */ + class TestablePolicyGenerator extends PolicyGenerator { + + TestablePolicyGenerator() { + super(conf, gpgContext); + } + + @Override + protected Map<SubClusterId, Map<Class, Object>> getInfos( + Map<SubClusterId, SubClusterInfo> activeSubClusters) { + Map<SubClusterId, Map<Class, Object>> ret = new HashMap<>(); + for (SubClusterId id : activeSubClusters.keySet()) { + if (!ret.containsKey(id)) { + ret.put(id, new HashMap<Class, Object>()); + } + ret.get(id).put(ClusterMetricsInfo.class, + clusterInfos.get(id).get(ClusterMetricsInfo.class)); + } + return ret; + } + + @Override + protected Map<SubClusterId, SchedulerInfo> getSchedulerInfo( + Map<SubClusterId, SubClusterInfo> activeSubClusters) { + Map<SubClusterId, SchedulerInfo> ret = + new HashMap<SubClusterId, SchedulerInfo>(); + for (SubClusterId id : activeSubClusters.keySet()) { + ret.put(id, schedulerInfos.get(id)); + } + return ret; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json new file mode 100644 index 0000000..3ad4594 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json @@ -0,0 +1,134 @@ +{ + "capacity": 100.0, + "usedCapacity": 0.0, + "maxCapacity": 100.0, + "queueName": "root", + "queues": { + "queue": [ + { + "type": "capacitySchedulerLeafQueueInfo", + "capacity": 100.0, + "usedCapacity": 0.0, + "maxCapacity": 100.0, + "absoluteCapacity": 100.0, + "absoluteMaxCapacity": 100.0, + "absoluteUsedCapacity": 0.0, + "numApplications": 484, + "queueName": "default", + "state": "RUNNING", + "resourcesUsed": { + "memory": 0, + "vCores": 0 + }, + "hideReservationQueues": false, + "nodeLabels": [ + "*" + ], + "numActiveApplications": 484, + "numPendingApplications": 0, + "numContainers": 0, + "maxApplications": 10000, + "maxApplicationsPerUser": 10000, + "userLimit": 100, + "users": { + "user": [ + { + "username": "Default", + "resourcesUsed": { + "memory": 0, + "vCores": 0 + }, + "numPendingApplications": 0, + "numActiveApplications": 468, + "AMResourceUsed": { + "memory": 30191616, + "vCores": 468 + }, + "userResourceLimit": { + "memory": 31490048, + "vCores": 7612 + } + } + ] + }, + "userLimitFactor": 1.0, + "AMResourceLimit": { + "memory": 31490048, + "vCores": 7612 + }, + "usedAMResource": { + "memory": 30388224, + "vCores": 532 + }, + "userAMResourceLimit": { + "memory": 31490048, + "vCores": 7612 + }, + "preemptionDisabled": true + } + ] + }, + "health": { + "lastrun": 1517951638085, + "operationsInfo": { + "entry": { + "key": "last-allocation", + "value": { + "nodeId": "node0:0", + "containerId": "container_e61477_1517922128312_0340_01_000001", + "queue": "root.default" + } + }, + "entry": { + "key": "last-reservation", + "value": { + "nodeId": "node0:1", + "containerId": "container_e61477_1517879828320_0249_01_000001", + "queue": "root.default" + } + }, + "entry": { + "key": "last-release", + "value": { + "nodeId": "node0:2", + "containerId": "container_e61477_1517922128312_0340_01_000001", + "queue": "root.default" + } + }, + "entry": { + "key": "last-preemption", + "value": { + "nodeId": "N/A", + "containerId": "N/A", + "queue": "N/A" + } + } + }, + "lastRunDetails": [ + { + "operation": "releases", + "count": 0, + "resources": { + "memory": 0, + "vCores": 0 + } + }, + { + "operation": "allocations", + "count": 0, + "resources": { + "memory": 0, + "vCores": 0 + } + }, + { + "operation": "reservations", + "count": 0, + "resources": { + "memory": 0, + "vCores": 0 + } + } + ] + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org