This is an automated email from the ASF dual-hosted git repository. mercyblitz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push: new 6d2ba7e add new loadbalance strategy (#6064) 6d2ba7e is described below commit 6d2ba7ec7b5a1cb7971143d4262d0a1bfc826d45 Author: August <33082727+august...@users.noreply.github.com> AuthorDate: Wed May 6 16:47:22 2020 +0800 add new loadbalance strategy (#6064) * add new loadbalance strategy * add note * Update ShortestResponseLoadBalanceTest.java --- .../loadbalance/ShortestResponseLoadBalance.java | 100 +++++++++++++++++++++ .../org.apache.dubbo.rpc.cluster.LoadBalance | 3 +- .../cluster/loadbalance/LoadBalanceBaseTest.java | 22 +++++ .../ShortestResponseLoadBalanceTest.java | 53 +++++++++++ 4 files changed, 177 insertions(+), 1 deletion(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java new file mode 100644 index 0000000..610b1b4 --- /dev/null +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.loadbalance; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcStatus; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * ShortestResponseLoadBalance + * </p> + * Filter the number of invokers with the shortest response time of success calls and count the weights and quantities of these invokers. + * If there is only one invoker, use the invoker directly; + * if there are multiple invokers and the weights are not the same, then random according to the total weight; + * if there are multiple invokers and the same weight, then randomly called. + */ +public class ShortestResponseLoadBalance extends AbstractLoadBalance { + + public static final String NAME = "shortestresponse"; + + @Override + protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { + // Number of invokers + int length = invokers.size(); + // Estimated shortest response time of all invokers + long shortestResponse = Long.MAX_VALUE; + // The number of invokers having the same estimated shortest response time + int shortestCount = 0; + // The index of invokers having the same estimated shortest response time + int[] shortestIndexes = new int[length]; + // the weight of every invokers + int[] weights = new int[length]; + // The sum of the warmup weights of all the shortest response invokers + int totalWeight = 0; + // The weight of the first shortest response invokers + int firstWeight = 0; + // Every shortest response invoker has the same weight value? + boolean sameWeight = true; + + // Filter out all the shortest response invokers + for (int i = 0; i < length; i++) { + Invoker<T> invoker = invokers.get(i); + RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); + // Calculate the estimated response time from the product of active connections and succeeded average elapsed time. + long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed(); + int active = rpcStatus.getActive(); + long estimateResponse = succeededAverageElapsed * active; + int afterWarmup = getWeight(invoker, invocation); + weights[i] = afterWarmup; + // Same as LeastActiveLoadBalance + if (estimateResponse < shortestResponse) { + shortestResponse = estimateResponse; + shortestCount = 1; + shortestIndexes[0] = i; + totalWeight = afterWarmup; + firstWeight = afterWarmup; + sameWeight = true; + } else if (estimateResponse == shortestResponse) { + shortestIndexes[shortestCount++] = i; + totalWeight += afterWarmup; + if (sameWeight && i > 0 + && afterWarmup != firstWeight) { + sameWeight = false; + } + } + } + if (shortestCount == 1) { + return invokers.get(shortestIndexes[0]); + } + if (!sameWeight && totalWeight > 0) { + int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); + for (int i = 0; i < shortestCount; i++) { + int shortestIndex = shortestIndexes[i]; + offsetWeight -= weights[shortestIndex]; + if (offsetWeight < 0) { + return invokers.get(shortestIndex); + } + } + } + return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]); + } +} diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance index 69e1cff..8a636a9 100644 --- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance +++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance @@ -1,4 +1,5 @@ random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance -consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance \ No newline at end of file +consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance +shortestresponse=org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance \ No newline at end of file diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java index fef733a..f9ab3ea 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java @@ -58,6 +58,7 @@ public class LoadBalanceBaseTest { RpcStatus weightTestRpcStatus1; RpcStatus weightTestRpcStatus2; RpcStatus weightTestRpcStatus3; + RpcStatus weightTestRpcStatus5; RpcInvocation weightTestInvocation; @@ -205,10 +206,13 @@ public class LoadBalanceBaseTest { } protected List<Invoker<LoadBalanceBaseTest>> weightInvokers = new ArrayList<Invoker<LoadBalanceBaseTest>>(); + protected List<Invoker<LoadBalanceBaseTest>> weightInvokersSR= new ArrayList<Invoker<LoadBalanceBaseTest>>(); + protected Invoker<LoadBalanceBaseTest> weightInvoker1; protected Invoker<LoadBalanceBaseTest> weightInvoker2; protected Invoker<LoadBalanceBaseTest> weightInvoker3; protected Invoker<LoadBalanceBaseTest> weightInvokerTmp; + protected Invoker<LoadBalanceBaseTest> weightInvoker5; @BeforeEach public void before() throws Exception { @@ -216,6 +220,7 @@ public class LoadBalanceBaseTest { weightInvoker2 = mock(Invoker.class, Mockito.withSettings().stubOnly()); weightInvoker3 = mock(Invoker.class, Mockito.withSettings().stubOnly()); weightInvokerTmp = mock(Invoker.class, Mockito.withSettings().stubOnly()); + weightInvoker5 = mock(Invoker.class, Mockito.withSettings().stubOnly()); weightTestInvocation = new RpcInvocation(); weightTestInvocation.setMethodName("test"); @@ -224,6 +229,7 @@ public class LoadBalanceBaseTest { URL url2 = URL.valueOf("test2://127.0.0.1:12/DemoService?weight=9&active=0"); URL url3 = URL.valueOf("test3://127.0.0.1:13/DemoService?weight=6&active=1"); URL urlTmp = URL.valueOf("test4://127.0.0.1:9999/DemoService?weight=11&active=0"); + URL url5 = URL.valueOf("test5://127.0.0.1:15/DemoService?weight=15&active=0"); given(weightInvoker1.isAvailable()).willReturn(true); given(weightInvoker1.getInterface()).willReturn(LoadBalanceBaseTest.class); @@ -241,16 +247,32 @@ public class LoadBalanceBaseTest { given(weightInvokerTmp.getInterface()).willReturn(LoadBalanceBaseTest.class); given(weightInvokerTmp.getUrl()).willReturn(urlTmp); + given(weightInvoker5.isAvailable()).willReturn(true); + given(weightInvoker5.getInterface()).willReturn(LoadBalanceBaseTest.class); + given(weightInvoker5.getUrl()).willReturn(url5); + weightInvokers.add(weightInvoker1); weightInvokers.add(weightInvoker2); weightInvokers.add(weightInvoker3); + weightInvokersSR.add(weightInvoker1); + weightInvokersSR.add(weightInvoker2); + weightInvokersSR.add(weightInvoker5); + weightTestRpcStatus1 = RpcStatus.getStatus(weightInvoker1.getUrl(), weightTestInvocation.getMethodName()); weightTestRpcStatus2 = RpcStatus.getStatus(weightInvoker2.getUrl(), weightTestInvocation.getMethodName()); weightTestRpcStatus3 = RpcStatus.getStatus(weightInvoker3.getUrl(), weightTestInvocation.getMethodName()); + weightTestRpcStatus5 = RpcStatus.getStatus(weightInvoker5.getUrl(), weightTestInvocation.getMethodName()); + // weightTestRpcStatus3 active is 1 RpcStatus.beginCount(weightInvoker3.getUrl(), weightTestInvocation.getMethodName()); + + // weightTestRpcStatus5 shortest response time of success calls is bigger than 0 + // weightTestRpcStatus5 active is 1 + RpcStatus.beginCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName()); + RpcStatus.endCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName(), 5000L, true); + RpcStatus.beginCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName()); } protected Map<Invoker, InvokeResult> getWeightedInvokeResult(int runs, String loadbalanceName) { diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java new file mode 100644 index 0000000..86814ae --- /dev/null +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License")); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.loadbalance; + +import org.apache.dubbo.rpc.Invoker; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ShortestResponseLoadBalanceTest extends LoadBalanceBaseTest{ + + @Test + public void testSelectByWeight() { + int sumInvoker1 = 0; + int sumInvoker2 = 0; + int loop = 10000; + + ShortestResponseLoadBalance lb = new ShortestResponseLoadBalance(); + for (int i = 0; i < loop; i++) { + Invoker selected = lb.select(weightInvokersSR, null, weightTestInvocation); + + if (selected.getUrl().getProtocol().equals("test1")) { + sumInvoker1++; + } + + if (selected.getUrl().getProtocol().equals("test2")) { + sumInvoker2++; + } + // never select invoker5 because it's estimated response time is more than invoker1 and invoker2 + Assertions.assertTrue(!selected.getUrl().getProtocol().equals("test5"), "select is not the shortest one"); + } + + // the sumInvoker1 : sumInvoker2 approximately equal to 1: 9 + System.out.println(sumInvoker1); + System.out.println(sumInvoker2); + + Assertions.assertEquals(sumInvoker1 + sumInvoker2, loop, "select failed!"); + } +}