chickenlj closed pull request #1144: [Dubbo-#1123] Fix several problems in
ConsistentHashLoadBalance
URL: https://github.com/apache/incubator-dubbo/pull/1144
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
index 44c751d9a2..e068e6ba49 100644
---
a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
+++
b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
@@ -16,44 +16,78 @@
*/
package com.alibaba.dubbo.rpc.cluster.loadbalance;
-import com.alibaba.dubbo.common.Constants;
-import com.alibaba.dubbo.common.URL;
-import com.alibaba.dubbo.rpc.Invocation;
-import com.alibaba.dubbo.rpc.Invoker;
-
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import com.alibaba.dubbo.common.Constants;
+import com.alibaba.dubbo.common.URL;
+import com.alibaba.dubbo.rpc.Invocation;
+import com.alibaba.dubbo.rpc.Invoker;
+import com.alibaba.dubbo.rpc.support.RpcUtils;
+
/**
* ConsistentHashLoadBalance
- *
+ *
+ * After fixed, the ConsistentHashLoadBalance will surpport to used in
GenericService,
+ * and it will reduce the times of rebuilding hash cycle when the provider
restart
+ * but nothing to change
+ *
*/
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
+
+ public static final String NAME = "consistenthash";
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors =
new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
- @SuppressWarnings("unchecked")
@Override
+ @SuppressWarnings("unchecked")
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url,
Invocation invocation) {
- String key = invokers.get(0).getUrl().getServiceKey() + "." +
invocation.getMethodName();
- int identityHashCode = System.identityHashCode(invokers);
+ String methodName = RpcUtils.getMethodName(invocation);
+ String key = invokers.get(0).getUrl().getServiceKey() + "." +
methodName;
+ Map<String, Invoker<T>> invokerMap = new HashMap<String, Invoker<T>>();
+ int identityHashCode = caculateInvokerHashCode(invokers, invokerMap);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>)
selectors.get(key);
if (selector == null || selector.identityHashCode != identityHashCode)
{
- selectors.put(key, new ConsistentHashSelector<T>(invokers,
invocation.getMethodName(), identityHashCode));
+ selectors.put(key, new ConsistentHashSelector<T>(invokers,
methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
- return selector.select(invocation);
+ return selector.select(invocation, invokerMap);
+ }
+
+ /**
+ * Generate hashcode according to the invoker list.
+ * the value only changes when adding or reducing the invoker.
+ *
+ * The method use System.identityHashCode(invokers) to generate
+ * hashcode in past.The result of method executed changes everytime,
+ * because the timestamp of invoker alse be solved.
+ *
+ * Now if the providers are only restart but nothing to change,
+ * the value also won't be changed;
+ * @param invokers
+ * @return the hashcode of invoker addresses
+ */
+ private <T> int caculateInvokerHashCode(List<Invoker<T>> invokers,
Map<String, Invoker<T>> invokerMap) {
+ StringBuilder metakey = new StringBuilder();
+ for (Invoker<T> obj : invokers) {
+ String address = obj.getUrl().getAddress();
+ metakey.append(address);
+ invokerMap.put(address, obj);
+ }
+ return metakey.toString().hashCode();
}
private static final class ConsistentHashSelector<T> {
- private final TreeMap<Long, Invoker<T>> virtualInvokers;
+ private final TreeMap<Long, String> virtualInvokers;
private final int replicaNumber;
@@ -62,7 +96,7 @@
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName,
int identityHashCode) {
- this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
+ this.virtualInvokers = new TreeMap<Long, String>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
this.replicaNumber = url.getMethodParameter(methodName,
"hash.nodes", 160);
@@ -77,34 +111,49 @@
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
- virtualInvokers.put(m, invoker);
+ // here not put the object reference into the map,
only put the address
+ virtualInvokers.put(m, address.intern());
}
}
}
}
- public Invoker<T> select(Invocation invocation) {
- String key = toKey(invocation.getArguments());
+ public Invoker<T> select(Invocation invocation, Map<String,
Invoker<T>> invokerMap) {
+ Object[] param = RpcUtils.getArguments(invocation);
+ String key = toKey(param);
byte[] digest = md5(key);
- return selectForKey(hash(digest, 0));
+ String invokerAddress = selectForKey(hash(digest, 0));
+ return selectInvoker(invokerAddress, invokerMap);
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
+ // find a problem here, when args[i] is an array,
+ // this method is not stable. Because the value of args[i]
+ // may be the memory address.
buf.append(args[i]);
}
}
return buf.toString();
}
- private Invoker<T> selectForKey(long hash) {
- Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash,
true).firstEntry();
- if (entry == null) {
- entry = virtualInvokers.firstEntry();
- }
- return entry.getValue();
+ private String selectForKey(long hash) {
+ Long key = hash;
+ if (!virtualInvokers.containsKey(key)) {
+ SortedMap<Long, String> tailMap = virtualInvokers.tailMap(key);
+ if (tailMap.isEmpty()) {
+ key = virtualInvokers.firstKey();
+ } else {
+ key = tailMap.firstKey();
+ }
+ }
+ return virtualInvokers.get(key);
+ }
+
+ private Invoker<T> selectInvoker(String invokerAddress, Map<String,
Invoker<T>> invokerMap) {
+ return invokerMap.get(invokerAddress);
}
private long hash(byte[] digest, int number) {
diff --git
a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
index 0a3e0a5120..12d687434d 100644
---
a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
+++
b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
@@ -226,15 +226,18 @@ public Result invoke(final Invocation invocation) throws
RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;
List<Invoker<T>> invokers = list(invocation);
+ String methodName = RpcUtils.getMethodName(invocation);
if (invokers != null && !invokers.isEmpty()) {
loadbalance =
ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
- .getMethodParameter(invocation.getMethodName(),
Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
+ .getMethodParameter(methodName, Constants.LOADBALANCE_KEY,
Constants.DEFAULT_LOADBALANCE));
+ } else {
+ loadbalance =
ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
- protected void checkWhetherDestroyed() {
+ protected void checkWhetherDestroyed() {
if (destroyed.get()) {
throw new RpcException("Rpc cluster invoker for " + getInterface()
+ " on consumer " + NetUtils.getLocalHost()
diff --git
a/dubbo-test/dubbo-test-examples/src/main/java/com/alibaba/dubbo/examples/generic/api/IService.java
b/dubbo-test/dubbo-test-examples/src/main/java/com/alibaba/dubbo/examples/generic/api/IService.java
index 44e3d69c0e..3972c9218c 100644
---
a/dubbo-test/dubbo-test-examples/src/main/java/com/alibaba/dubbo/examples/generic/api/IService.java
+++
b/dubbo-test/dubbo-test-examples/src/main/java/com/alibaba/dubbo/examples/generic/api/IService.java
@@ -18,4 +18,5 @@
public interface IService<P, V> {
V get(P params);
+
}
diff --git
a/dubbo-test/dubbo-test-examples/src/main/java/com/alibaba/dubbo/examples/generic/impl/UserServiceImpl.java
b/dubbo-test/dubbo-test-examples/src/main/java/com/alibaba/dubbo/examples/generic/impl/UserServiceImpl.java
index 92cf923137..f4e6f079ff 100644
---
a/dubbo-test/dubbo-test-examples/src/main/java/com/alibaba/dubbo/examples/generic/impl/UserServiceImpl.java
+++
b/dubbo-test/dubbo-test-examples/src/main/java/com/alibaba/dubbo/examples/generic/impl/UserServiceImpl.java
@@ -24,4 +24,5 @@
public User get(Params params) {
return new User(1, "charles");
}
+
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]