This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 001d353cc59 YARN-7708. BackPort [GPG] Load based policy generator. 
(#5902)  Contributed by Young Chen.
001d353cc59 is described below

commit 001d353cc596039124c7fbb66441bb6b72408618
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Sat Aug 5 16:33:35 2023 +0800

    YARN-7708. BackPort [GPG] Load based policy generator. (#5902)  Contributed 
by Young Chen.
---
 .../hadoop-yarn/dev-support/findbugs-exclude.xml   |   4 +
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  18 ++
 .../src/main/resources/yarn-default.xml            |  62 ++++
 .../server/globalpolicygenerator/GPGUtils.java     |   5 +-
 .../policygenerator/GlobalPolicy.java              |   2 +-
 .../policygenerator/LoadBasedGlobalPolicy.java     | 329 +++++++++++++++++++++
 .../policygenerator/PolicyGenerator.java           |   6 +-
 .../policygenerator/TestLoadBasedGlobalPolicy.java | 206 +++++++++++++
 8 files changed, 625 insertions(+), 7 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml 
b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index cf457c23eb1..309c0285800 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -209,6 +209,10 @@
     <Class 
name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.RecoveryComparator"
 />
     <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
   </Match>
+  <Match>
+    <Class 
name="org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.LoadBasedGlobalPolicy$SortByDescendingLoad"
 />
+    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+  </Match>
   <!-- Ignore some irrelevant class name warning -->
   <Match>
     <Class name="org.apache.hadoop.yarn.api.records.SerializedException" />
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index faa5c82d7e9..ae7ea196d47 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4427,6 +4427,24 @@ public class YarnConfiguration extends Configuration {
   public static final String GPG_POLICY_GENERATOR_BLACKLIST =
       FEDERATION_GPG_POLICY_PREFIX + "blacklist";
 
+  private static final String FEDERATION_GPG_LOAD_BASED_PREFIX =
+      YarnConfiguration.FEDERATION_GPG_PREFIX + "policy.generator.load-based.";
+  public static final String FEDERATION_GPG_LOAD_BASED_MIN_PENDING =
+      FEDERATION_GPG_LOAD_BASED_PREFIX + "pending.minimum";
+  public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING = 100;
+  public static final String FEDERATION_GPG_LOAD_BASED_MAX_PENDING =
+      FEDERATION_GPG_LOAD_BASED_PREFIX + "pending.maximum";
+  public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING = 1000;
+  public static final String FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT =
+      FEDERATION_GPG_LOAD_BASED_PREFIX + "weight.minimum";
+  public static final float DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT = 
0.0f;
+  public static final String FEDERATION_GPG_LOAD_BASED_MAX_EDIT =
+      FEDERATION_GPG_LOAD_BASED_PREFIX + "edit.maximum";
+  public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT = 3;
+  public static final String FEDERATION_GPG_LOAD_BASED_SCALING =
+      FEDERATION_GPG_LOAD_BASED_PREFIX + "scaling";
+  public static final String DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING = 
"LINEAR";
+
   /**
    * Connection and Read timeout from the Router to RM.
    */
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index b643bd8d08d..6cab018e5c4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5468,4 +5468,66 @@
     <value></value>
   </property>
 
+  <property>
+    <description>
+      GPG load policy, the minimum number of pending applications in the 
subCluster.
+    </description>
+    
<name>yarn.federation.gpg.policy.generator.load-based.pending.minimum</name>
+    <value>100</value>
+  </property>
+
+  <property>
+    <description>
+      GPG load policy, the maximum number of pending applications in the 
subCluster.
+    </description>
+    
<name>yarn.federation.gpg.policy.generator.load-based.pending.maximum</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <description>
+      GPG load policy, the subCluster minimum weight,
+      If a subCluster has a very high load, we will assign this value to the 
subCluster.
+      The default value is 0, which means that we no longer assign appliaction 
to this subCluster.
+    </description>
+    <name>yarn.federation.gpg.policy.generator.load-based.weight.minimum</name>
+    <value>0</value>
+  </property>
+
+  <property>
+    <description>
+      GPG load policy, We choose the subCluster computing load of TopN.
+      This value represents the number of subClusters we want to calculate.
+    </description>
+    <name>yarn.federation.gpg.policy.generator.load-based.edit.maximum</name>
+    <value>3</value>
+  </property>
+
+  <property>
+    <description>
+      GPG load policy, We provide 4 calculation methods: NONE, LINEAR, 
QUADRATIC, LOG.
+
+      Note, this calculation method is when the number of Pending Applications 
in
+      the subCluster is less than 
yarn.federation.gpg.policy.generator.load-based.pending.maximum.
+
+      maxPendingVal = 
yarn.federation.gpg.policy.generator.load-based.pending.maximum -
+      yarn.federation.gpg.policy.generator.load-based.pending.minimum
+      curPendingVal = Pending Applications in the subCluster -
+      yarn.federation.gpg.policy.generator.load-based.pending.minimum
+
+      1. NONE: No calculation is required, and the weight is 1 at this time.
+      2. LINEAR: For linear computation, we will use (maxPendingVal - 
curPendingVal) / (maxPendingVal).
+      3. QUADRATIC: Calculated using quadratic,
+      We will calculate quadratic for maxPendingVal, curPendingVal,
+      then use this formula = (maxPendingVal - curPendingVal) / 
(maxPendingVal).
+      4. LOG(LOGARITHM): Calculated using logarithm,
+      We will calculate logarithm for maxPendingVal, curPendingVal,
+      then use this formula = (maxPendingVal - curPendingVal) / 
(maxPendingVal).
+
+      LINEAR is used by default.
+    </description>
+    <name>yarn.federation.gpg.policy.generator.load-based.scaling</name>
+    <value>LINEAR</value>
+  </property>
+
 </configuration>
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
index 6d2e1d41421..2bb56caeffb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -55,7 +55,7 @@ public final class GPGUtils {
    */
   public static <T> T invokeRMWebService(String webAddr, String path, final 
Class<T> returnType) {
     Client client = Client.create();
-    T obj = null;
+    T obj;
 
     WebResource webResource = client.resource(webAddr);
     ClientResponse response = null;
@@ -86,8 +86,7 @@ public final class GPGUtils {
    */
   public static Map<SubClusterIdInfo, Float> createUniformWeights(
       Set<SubClusterId> ids) {
-    Map<SubClusterIdInfo, Float> weights =
-        new HashMap<>();
+    Map<SubClusterIdInfo, Float> weights = new HashMap<>();
     for(SubClusterId id : ids) {
       weights.put(new SubClusterIdInfo(id), 1.0f);
     }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
index c6d6558dbec..ab60a48434e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
@@ -53,7 +53,7 @@ public abstract class GlobalPolicy implements Configurable {
    *
    * @return a map of the object type and RM path.
    */
-  protected Map<Class, String> registerPaths() {
+  protected Map<Class<?>, String> registerPaths() {
     // Default register nothing
     return Collections.emptyMap();
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java
new file mode 100644
index 00000000000..f728b92d71f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import 
org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import 
org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MIN_PENDING;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MAX_PENDING;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MAX_EDIT;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_SCALING;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING;
+
+/**
+ * Load based policy that generates weighted policies by scaling
+ * the cluster load (based on pending) to a weight from 0.0 to 1.0.
+ */
+public class LoadBasedGlobalPolicy extends GlobalPolicy {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LoadBasedGlobalPolicy.class);
+
+  public enum Scaling {
+    LINEAR,
+    QUADRATIC,
+    LOG,
+    NONE
+  }
+
+  // Minimum pending count before the policy starts scaling down the weights
+  private int minPending;
+  // Maximum pending count before policy stops scaling down the weights
+  // (they'll be set to min weight)
+  private int maxPending;
+  // Minimum weight that a sub cluster will be assigned
+  private float minWeight;
+  // Maximum number of weights that can be scaled down simultaneously
+  private int maxEdit;
+  // Scaling type
+  private Scaling scaling = Scaling.NONE;
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    minPending = conf.getInt(FEDERATION_GPG_LOAD_BASED_MIN_PENDING,
+        DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING);
+    maxPending = conf.getInt(FEDERATION_GPG_LOAD_BASED_MAX_PENDING,
+        DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING);
+    minWeight = conf.getFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT,
+        DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT);
+    maxEdit = conf.getInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT,
+        DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT);
+
+    try {
+      scaling = Scaling.valueOf(conf.get(FEDERATION_GPG_LOAD_BASED_SCALING,
+          DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING));
+    } catch (IllegalArgumentException e) {
+      LOG.warn("Invalid scaling mode provided", e);
+    }
+
+    // Check that all configuration values are valid
+    if (!(minPending <= maxPending)) {
+      throw new YarnRuntimeException("minPending = " + minPending
+          + " must be less than or equal to maxPending=" + maxPending);
+    }
+    if (!(minWeight >= 0 && minWeight < 1)) {
+      throw new YarnRuntimeException(
+          "minWeight = " + minWeight + " must be within range [0,1)");
+    }
+  }
+
+  @Override
+  protected Map<Class<?>, String> registerPaths() {
+    // Register for the endpoints we want to receive information on
+    Map<Class<?>, String> map = new HashMap<>();
+    map.put(ClusterMetricsInfo.class, RMWSConsts.METRICS);
+    return map;
+  }
+
+  /**
+   * Update the policy of the queue.
+   *
+   * @param queueName   name of the queue
+   * @param clusterInfo subClusterId map to cluster information about the
+   *                    SubCluster used to make policy decisions
+   * @param currentManager the FederationPolicyManager for the queue's existing
+   * policy the manager may be null, in which case the policy
+   * will need to be created.
+   *
+   * @return FederationPolicyManager.
+   */
+  @Override
+  protected FederationPolicyManager updatePolicy(String queueName,
+      Map<SubClusterId, Map<Class, Object>> clusterInfo,
+      FederationPolicyManager currentManager) {
+    if (currentManager == null) {
+      LOG.info("Creating load based weighted policy queue {}.", queueName);
+      currentManager = getWeightedLocalityPolicyManager(queueName, 
clusterInfo);
+    } else if (currentManager instanceof WeightedLocalityPolicyManager) {
+      LOG.info("Updating load based weighted policy queue {}.", queueName);
+      currentManager = getWeightedLocalityPolicyManager(queueName, 
clusterInfo);
+    } else {
+      LOG.warn("Policy for queue {} is of type {}, expected {}.", queueName,
+          currentManager.getClass(), WeightedLocalityPolicyManager.class);
+    }
+    return currentManager;
+  }
+
+  /**
+   * GPG can help update the policy of the queue.
+   *
+   * We automatically generate the weight of the subCluster
+   * according to the clusterMetrics of the subCluster.
+   *
+   * @param queue queueName.
+   * @param subClusterMetricInfos Metric information of the subCluster.
+   * @return WeightedLocalityPolicyManager.
+   */
+  protected WeightedLocalityPolicyManager 
getWeightedLocalityPolicyManager(String queue,
+      Map<SubClusterId, Map<Class, Object>> subClusterMetricInfos) {
+
+    // Parse the metric information of the subCluster.
+    Map<SubClusterId, ClusterMetricsInfo> clusterMetrics =
+        getSubClustersMetricsInfo(subClusterMetricInfos);
+
+    if (MapUtils.isEmpty(clusterMetrics)) {
+      return null;
+    }
+
+    // Get the new weight of the subCluster.
+    WeightedLocalityPolicyManager manager = new 
WeightedLocalityPolicyManager();
+    Map<SubClusterIdInfo, Float> weights = getTargetWeights(clusterMetrics);
+    manager.setQueue(queue);
+    manager.getWeightedPolicyInfo().setAMRMPolicyWeights(weights);
+    manager.getWeightedPolicyInfo().setRouterPolicyWeights(weights);
+    return manager;
+  }
+
+  /**
+   * Get the ClusterMetric information of the subCluster.
+   *
+   * @param subClusterMetricsInfo subCluster Metric Information.
+   * @return Mapping relationship between subCluster and Metric.
+   */
+  protected Map<SubClusterId, ClusterMetricsInfo> getSubClustersMetricsInfo(
+      Map<SubClusterId, Map<Class, Object>> subClusterMetricsInfo) {
+
+    // Check whether the Metric information of the sub-cluster is empty,
+    // if it is empty, we will directly return null.
+    if(MapUtils.isEmpty(subClusterMetricsInfo)) {
+      LOG.warn("The metric info of the subCluster is empty.");
+      return null;
+    }
+
+    Map<SubClusterId, ClusterMetricsInfo> clusterMetrics = new HashMap<>();
+    for (Map.Entry<SubClusterId, Map<Class, Object>> entry : 
subClusterMetricsInfo.entrySet()) {
+      SubClusterId subClusterId = entry.getKey();
+      Map<Class, Object> subClusterMetrics = entry.getValue();
+      ClusterMetricsInfo clusterMetricsInfo = (ClusterMetricsInfo)
+          subClusterMetrics.getOrDefault(ClusterMetricsInfo.class, null);
+      clusterMetrics.put(subClusterId, clusterMetricsInfo);
+    }
+
+    // return subCluster Metric Information.
+    return clusterMetrics;
+  }
+
+  /**
+   * Get subCluster target weight.
+   *
+   * @param clusterMetrics Metric of the subCluster.
+   * @return subCluster Weights.
+   */
+  @VisibleForTesting
+  protected Map<SubClusterIdInfo, Float> getTargetWeights(
+      Map<SubClusterId, ClusterMetricsInfo> clusterMetrics) {
+    Map<SubClusterIdInfo, Float> weights = 
GPGUtils.createUniformWeights(clusterMetrics.keySet());
+
+    List<SubClusterId> scs = new ArrayList<>(clusterMetrics.keySet());
+    // Sort the sub clusters into descending order based on pending load
+    scs.sort(new SortByDescendingLoad(clusterMetrics));
+
+    // Keep the top N loaded sub clusters
+    scs = scs.subList(0, Math.min(maxEdit, scs.size()));
+
+    for (SubClusterId sc : scs) {
+      LOG.info("Updating weight for sub cluster {}", sc.toString());
+      int pending = clusterMetrics.get(sc).getAppsPending();
+      if (pending <= minPending) {
+        LOG.info("Load ({}) is lower than minimum ({}), skipping", pending, 
minPending);
+      } else if (pending < maxPending) {
+        // The different scaling strategies should all map values from the
+        // range min_pending+1 to max_pending to the range min_weight to 1.0f
+        // so we pre-process and simplify the domain to some value [1, MAX-MIN)
+        int val = pending - minPending;
+        int maxVal = maxPending - minPending;
+
+        // Scale the weights to respect the config minimum
+        float weight = getWeightByScaling(maxVal, val);
+        weight = weight * (1.0f - minWeight);
+        weight += minWeight;
+        weights.put(new SubClusterIdInfo(sc), weight);
+        LOG.info("Load ({}) is within maximum ({}), setting weights via {} "
+            + "scale to {}", pending, maxPending, scaling, weight);
+      } else {
+        weights.put(new SubClusterIdInfo(sc), minWeight);
+        LOG.info("Load ({}) exceeded maximum ({}), setting weight to minimum: 
{}",
+            pending, maxPending, minWeight);
+      }
+    }
+    validateWeights(weights);
+    return weights;
+  }
+
+  /**
+   * Get weight information.
+   * We will calculate the weight information according to different Scaling.
+   *
+   * NONE: No calculation is required, and the weight is 1 at this time.
+   *
+   * LINEAR: For linear computation, we will use (maxPendingVal - 
curPendingVal) / (maxPendingVal).
+   *
+   * QUADRATIC: Calculated using quadratic,
+   * We will calculate quadratic for maxPendingVal, curPendingVal,
+   * then use this formula = (maxPendingVal - curPendingVal) / (maxPendingVal).
+   *
+   * LOG(LOGARITHM): Calculated using logarithm,
+   * We will calculate logarithm for maxPendingVal, curPendingVal,
+   * then use this formula = (maxPendingVal - curPendingVal) / (maxPendingVal).
+   *
+   * @param maxPendingVal maxPending - minPending
+   * @param curPendingVal pending - minPending
+   * @return Calculated weight information.
+   */
+  protected float getWeightByScaling(int maxPendingVal, int curPendingVal) {
+    float weight = 1.0f;
+    switch (scaling) {
+    case NONE:
+      break;
+    case LINEAR:
+      weight = (float) (maxPendingVal - curPendingVal) / (float) 
(maxPendingVal);
+      break;
+    case QUADRATIC:
+      double maxValQuad = Math.pow(maxPendingVal, 2);
+      double valQuad = Math.pow(curPendingVal, 2);
+      weight = (float) (maxValQuad - valQuad) / (float) (maxValQuad);
+      break;
+    case LOG:
+      double maxValLog = Math.log(maxPendingVal);
+      double valLog = Math.log(curPendingVal);
+      weight = (float) (maxValLog - valLog) / (float) (maxValLog);
+      break;
+    default:
+      LOG.warn("No suitable scaling found, Skip.");
+      break;
+    }
+    return weight;
+  }
+
+  /**
+   * Helper to avoid all zero weights. If weights are all zero, they're reset
+   * to one
+   * @param weights weights to validate
+   */
+  private void validateWeights(Map<SubClusterIdInfo, Float> weights) {
+    for(Float w : weights.values()) {
+      // If we find a nonzero weight, we're validated
+      if(w > 0.0f) {
+        return;
+      }
+    }
+    LOG.warn("All {} generated weights were 0.0f. Resetting to 1.0f.", 
weights.size());
+    // All weights were zero. Reset all back to 1.0
+    weights.replaceAll((i, v) -> 1.0f);
+  }
+
+  private static final class SortByDescendingLoad
+      implements Comparator<SubClusterId> {
+
+    private Map<SubClusterId, ClusterMetricsInfo> clusterMetrics;
+
+    private SortByDescendingLoad(
+        Map<SubClusterId, ClusterMetricsInfo> clusterMetrics) {
+      this.clusterMetrics = clusterMetrics;
+    }
+
+    public int compare(SubClusterId a, SubClusterId b) {
+      // Sort by pending load
+      return clusterMetrics.get(b).getAppsPending() - clusterMetrics.get(a)
+          .getAppsPending();
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
index 33501fb1e3e..3c94d6576e7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
@@ -129,15 +129,15 @@ public class PolicyGenerator implements Runnable, 
Configurable {
       try {
         manager = 
this.gpgContext.getPolicyFacade().getPolicyManager(queueName);
       } catch (YarnException e) {
-        LOG.error("GetPolicy for queue {} failed", queueName, e);
+        LOG.error("GetPolicy for queue {} failed.", queueName, e);
         continue;
       }
-      LOG.info("Updating policy for queue {}", queueName);
+      LOG.info("Updating policy for queue {}.", queueName);
       manager = policy.updatePolicy(queueName, clusterInfo, manager);
       try {
         this.gpgContext.getPolicyFacade().setPolicyManager(manager);
       } catch (YarnException e) {
-        LOG.error("SetPolicy for queue {} failed", queueName, e);
+        LOG.error("SetPolicy for queue {} failed.", queueName, e);
       }
     }
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java
new file mode 100644
index 00000000000..df58b30aaaa
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MAX_EDIT;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MIN_PENDING;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MAX_PENDING;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_SCALING;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for the Load Based Global Policy.
+ */
+public class TestLoadBasedGlobalPolicy {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestLoadBasedGlobalPolicy.class);
+
+  private static final int NUM_SC = 3;
+  private static final float DELTA = 0.00001f;
+
+  private static final int MIN_PENDING = 100;
+  private static final int MAX_PENDING = 500;
+
+  private List<SubClusterId> subClusterIds;
+  private Map<SubClusterId, ClusterMetricsInfo> clusterMetricsInfos;
+  private Map<SubClusterIdInfo, Float> weights;
+
+  private final Configuration conf;
+  private final LoadBasedGlobalPolicy policyGenerator;
+
+  public TestLoadBasedGlobalPolicy() {
+    conf = new Configuration();
+    policyGenerator = new LoadBasedGlobalPolicy();
+  }
+
+  @Before
+  public void setUp() {
+
+    conf.setInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 2);
+    conf.setInt(FEDERATION_GPG_LOAD_BASED_MIN_PENDING, MIN_PENDING);
+    conf.setInt(FEDERATION_GPG_LOAD_BASED_MAX_PENDING, MAX_PENDING);
+    conf.setFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, 0.0f);
+    conf.set(FEDERATION_GPG_LOAD_BASED_SCALING, 
LoadBasedGlobalPolicy.Scaling.LINEAR.name());
+    policyGenerator.setConf(conf);
+
+    subClusterIds = new ArrayList<>();
+    clusterMetricsInfos = new HashMap<>();
+    // Set up sub clusters
+    for (int i = 0; i < NUM_SC; ++i) {
+      // subClusterId
+      SubClusterId id = SubClusterId.newInstance("sc" + i);
+      subClusterIds.add(id);
+
+      // Cluster metrics info
+      ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo();
+      metricsInfo.setAppsPending(50);
+      clusterMetricsInfos.put(id, metricsInfo);
+    }
+  }
+
+  @Test
+  public void testSimpleTargetWeights() {
+    weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+    assertEquals(weights.size(), 3);
+    assertEquals(1.0, getWeight(0), DELTA);
+    assertEquals(1.0, getWeight(1), DELTA);
+    assertEquals(1.0, getWeight(2), DELTA);
+  }
+
+  @Test
+  public void testLoadTargetWeights() {
+    getMetric(0).setAppsPending(100);
+    weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+    assertEquals(weights.size(), 3);
+    assertEquals(1.0, getWeight(0), DELTA);
+    assertEquals(1.0, getWeight(1), DELTA);
+    assertEquals(1.0, getWeight(2), DELTA);
+    getMetric(0).setAppsPending(500);
+    weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+    assertEquals(weights.size(), 3);
+    assertEquals(0.0, getWeight(0), DELTA);
+    assertEquals(1.0, getWeight(1), DELTA);
+    assertEquals(1.0, getWeight(2), DELTA);
+  }
+
+  @Test
+  public void testMaxEdit() {
+    // The policy should be able to edit 2 weights
+    getMetric(0).setAppsPending(MAX_PENDING + 200);
+    getMetric(1).setAppsPending(MAX_PENDING + 100);
+    weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+    assertEquals(weights.size(), 3);
+    assertEquals(0.0, getWeight(0), DELTA);
+    assertEquals(0.0, getWeight(1), DELTA);
+    assertEquals(1.0, getWeight(2), DELTA);
+    // After updating the config, it should only edit the most loaded
+    conf.setInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 1);
+    policyGenerator.setConf(conf);
+    weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+    assertEquals(weights.size(), 3);
+    assertEquals(0.0, getWeight(0), DELTA);
+    assertEquals(1.0, getWeight(1), DELTA);
+    assertEquals(1.0, getWeight(2), DELTA);
+  }
+
+  @Test
+  public void testMinWeight() {
+    // If a minimum weight is set, the generator should not go below it
+    conf.setFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, 0.5f);
+    policyGenerator.setConf(conf);
+    getMetric(0).setAppsPending(Integer.MAX_VALUE);
+    weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+    assertEquals(weights.size(), 3);
+    assertEquals(0.5, getWeight(0), DELTA);
+    assertEquals(1.0, getWeight(1), DELTA);
+    assertEquals(1.0, getWeight(2), DELTA);
+  }
+
+  @Test
+  public void testScaling() {
+    LOG.info("Testing that the generator weights are monotonically"
+        + " decreasing regardless of scaling method");
+    for (LoadBasedGlobalPolicy.Scaling scaling :
+        new LoadBasedGlobalPolicy.Scaling[] 
{LoadBasedGlobalPolicy.Scaling.LINEAR,
+            LoadBasedGlobalPolicy.Scaling.QUADRATIC, 
LoadBasedGlobalPolicy.Scaling.LOG }) {
+      LOG.info("Testing {} scaling...", scaling);
+      conf.set(DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING, scaling.name());
+      policyGenerator.setConf(conf);
+      // Test a continuous range for scaling
+      float prevWeight = 1.01f;
+      for (int load = 0; load < MAX_PENDING * 2; ++load) {
+        getMetric(0).setAppsPending(load);
+        weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+        if (load < MIN_PENDING) {
+          // Below the minimum load, it should stay 1.0f
+          assertEquals(1.0f, getWeight(0), DELTA);
+        } else if (load < MAX_PENDING) {
+          // In the specified range, the weight should consistently decrease
+          float weight = getWeight(0);
+          assertTrue(weight < prevWeight);
+          prevWeight = weight;
+        } else {
+          // Above the maximum load, it should stay 0.0f
+          assertEquals(0.0f, getWeight(0), DELTA);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testNonZero() {
+    // If all generated weights are zero, they should be set back to one
+    conf.setFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, 0.0f);
+    conf.setInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 3);
+    policyGenerator.setConf(conf);
+    getMetric(0).setAppsPending(Integer.MAX_VALUE);
+    getMetric(1).setAppsPending(Integer.MAX_VALUE);
+    getMetric(2).setAppsPending(Integer.MAX_VALUE);
+    weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+    assertEquals(weights.size(), 3);
+    assertEquals(1.0, getWeight(0), DELTA);
+    assertEquals(1.0, getWeight(1), DELTA);
+    assertEquals(1.0, getWeight(2), DELTA);
+  }
+
+  private float getWeight(int sc) {
+    return weights.get(new SubClusterIdInfo(subClusterIds.get(sc)));
+  }
+
+  private ClusterMetricsInfo getMetric(int sc) {
+    return clusterMetricsInfos.get(subClusterIds.get(sc));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to