> Hi, > The `RoundRobinLoadBalance Algorithm` is not smoothing. > For example, assuming that the weight of **Server A** is five, the weight of > **Server B** is one, the weight of **Server C** is one. > When calling the service seven times, the responding order is **[A ,A, A, A, > A, B, C]**. > By this responding order, we can find that the first five callings point to > Server A, it will cause Server A to be overloaded in a short period of time. > Ideally, the call order would be **[A ,A, B, A, C, A, A]**. > So, I think there may be a better way to do this.
Hi, all I have another implementation of SMOOTH WRR including concurrent locking, test code is below: ```java package com.yoloho.rocketmq.consumers.demo; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.dubbo.common.utils.AtomicPositiveInteger; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; public final class DemoDubbo { /** * 假设该接口有10个可用的Invoker */ private static final int[] INVOKER_WEIGHT_ARRAY = new int[]{100, 100, 200, 200, 300, 300, 400, 400, 500, 500}; private static final String SERVICE_KEY = "com.test.Test.testMethod"; private static final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>(); private static final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>(); public static void main(String[] args) { AtomicInteger a; int times = 1000000; int[] selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; long start = System.nanoTime(); sequences.clear(); weightSequences.clear(); while (times-- > 0) { int select = currentSelect(); selectArray[select]++; } System.out.println("最新dubbo的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000); System.out.println("最新dubbo的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray)); times = 1000000; selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; start = System.nanoTime(); sequences.clear(); weightSequences.clear(); while (times-- > 0) { int select = oldSelect(); selectArray[select]++; } System.out.println("dubbo-2.5.3的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000); System.out.println("dubbo-2.5.3的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray)); times = 1000000; selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; start = System.nanoTime(); sequences.clear(); weightSequences.clear(); while (times-- > 0) { int select = oldRandomSelect(); selectArray[select]++; } System.out.println("dubbo-2.5.3的RandomLoadBalance耗时:" + (System.nanoTime() - start) / 1000000); System.out.println("dubbo-2.5.3的RandomLoadBalance流量分布:" + JSON.toJSONString(selectArray)); times = 1000000; selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; start = System.nanoTime(); while (times-- > 0) { int select = wrrSelect(); selectArray[select]++; } System.out.println("smooth的wrr耗时:" + (System.nanoTime() - start) / 1000000); System.out.println("smooth的wrr流量分布:" + JSON.toJSONString(selectArray)); } /** * 当前最新版本dubbo master分支中实现方式 * * @return 选择的invoker的index */ private static int currentSelect() { // 为了测试方便,key默认写死 String key = SERVICE_KEY; // invoker默认是10个 int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers int maxWeight = 0; // The maximum weight int minWeight = Integer.MAX_VALUE; // The minimum weight final LinkedHashMap<Integer, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Integer, IntegerWrapper>(); int weightSum = 0; for (int i = 0; i < length; i++) { int weight = getWeight(i); maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight minWeight = Math.min(minWeight, weight); // Choose the minimum weight if (weight > 0) { invokerToWeightMap.put(i, new IntegerWrapper(weight)); weightSum += weight; } } AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } int currentSequence = sequence.getAndIncrement(); if (maxWeight > 0 && minWeight < maxWeight) { int mod = currentSequence % weightSum; for (int i = 0; i < maxWeight; i++) { for (Map.Entry<Integer, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Integer k = each.getKey(); final IntegerWrapper v = each.getValue(); if (mod == 0 && v.getValue() > 0) { return k; } if (v.getValue() > 0) { v.decrement(); mod--; } } } } // Round robin return currentSequence % length; } /** * 2.5.3版本的roundrobin方式 * * @return */ private static int oldSelect() { // 为了测试方便,key默认写死 String key = SERVICE_KEY; // invoker默认是10个 int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers List<Integer> invokers = Lists.newArrayList(); int maxWeight = 0; // 最大权重 int minWeight = Integer.MAX_VALUE; // 最小权重 for (int i = 0; i < length; i++) { int weight = getWeight(i); maxWeight = Math.max(maxWeight, weight); // 累计最大权重 minWeight = Math.min(minWeight, weight); // 累计最小权重 } if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样 AtomicPositiveInteger weightSequence = weightSequences.get(key); if (weightSequence == null) { weightSequences.putIfAbsent(key, new AtomicPositiveInteger()); weightSequence = weightSequences.get(key); } int currentWeight = weightSequence.getAndIncrement() % maxWeight; List<Integer> weightInvokers = new ArrayList<Integer>(); for (int i = 0; i < length; i++) { // 筛选权重大于当前权重基数的Invoker if (getWeight(i) > currentWeight) { weightInvokers.add(i); } } int weightLength = weightInvokers.size(); if (weightLength == 1) { return weightInvokers.get(0); } else if (weightLength > 1) { invokers = weightInvokers; length = invokers.size(); } } AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } // 取模轮循 return invokers.get(sequence.getAndIncrement() % length); } /** * 2.5.3版本的random方式 * * @return */ private static int oldRandomSelect() { // 为了测试方便,key默认写死 String key = SERVICE_KEY; // invoker默认是10个 int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers int totalWeight = 0; // 总权重 boolean sameWeight = true; // 权重是否都一样 for (int i = 0; i < length; i++) { int weight = getWeight(i); totalWeight += weight; // 累计总权重 if (sameWeight && i > 0 && weight != getWeight(i - 1)) { sameWeight = false; // 计算所有权重是否一样 } } if (totalWeight > 0 && !sameWeight) { // 如果权重不相同且权重大于0则按总权重数随机 int offset = ThreadLocalRandom.current().nextInt(totalWeight); // 并确定随机值落在哪个片断上 for (int i = 0; i < length; i++) { offset -= getWeight(i); if (offset < 0) { return i; } } } // 如果权重相同或权重为0则均等随机 return ThreadLocalRandom.current().nextInt(length); } private static int getWeight(int invokerIndex) { return INVOKER_WEIGHT_ARRAY[invokerIndex]; } private static final class IntegerWrapper { private int value; public IntegerWrapper(int value) { this.value = value; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } public void decrement() { this.value--; } } private static class WeightedRoundRobin { private int weight; private int current; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; } public void setCurrent(int current) { this.current = current; } public int increaseWeight() { current += weight; return current; } public void sel(int total) { current -= total; } } private final static ConcurrentMap<String, Map<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, Map<String, WeightedRoundRobin>>(); private static Map<String, WeightedRoundRobin> getEntry(String key) { Map<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new HashMap<String, WeightedRoundRobin>()); map = methodWeightMap.get(key); } return map; } private static int wrrSelect() { // 为了测试方便,key默认写死 String key = SERVICE_KEY; Map<String, WeightedRoundRobin> map = getEntry(key); synchronized (map) { int totalWeight = 0; int maxCurrent = Integer.MIN_VALUE; int selectedInvoker = -1; WeightedRoundRobin selectedWRR = null; for (int i = 0; i < INVOKER_WEIGHT_ARRAY.length; i++) { String addr = "server:" + i; WeightedRoundRobin weightedRoundRobin = map.get(addr); int weight = INVOKER_WEIGHT_ARRAY[i]; if (weight < 0) { weight = 0; } if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setCurrent(0); weightedRoundRobin.setWeight(weight); map.put(addr, weightedRoundRobin); } if (weight != weightedRoundRobin.getWeight()) { //weight changed weightedRoundRobin.setCurrent(0); weightedRoundRobin.setWeight(weight); } int cur = weightedRoundRobin.increaseWeight(); if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = i; selectedWRR = weightedRoundRobin; } totalWeight += weight; } if (selectedInvoker >= 0) { selectedWRR.sel(totalWeight); return selectedInvoker; } } return -1; } } ``` Result: ``` 最新dubbo的RoundRobinLoadBalance耗时:8975 最新dubbo的RoundRobinLoadBalance流量分布:[33400,33400,66700,66700,100000,100000,133300,133300,166600,166600] dubbo-2.5.3的RoundRobinLoadBalance耗时:152 dubbo-2.5.3的RoundRobinLoadBalance流量分布:[20000,20000,45000,45000,78333,78333,128333,128333,228334,228334] dubbo-2.5.3的RandomLoadBalance耗时:45 dubbo-2.5.3的RandomLoadBalance流量分布:[33279,33289,67213,66143,99651,99970,133518,133463,167300,166174] smooth的wrr耗时:390 smooth的wrr流量分布:[33333,33333,66667,66667,100000,100000,133333,133333,166667,166667] ``` Is it acceptable? Time cost: **390** ms Code from nginx source code. And a cleaner block should be added to clean offline nodes in map(typically not performance sensitive). [ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ] This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org
--------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For additional commands, e-mail: notifications-h...@dubbo.apache.org