Copilot commented on code in PR #7887:
URL: https://github.com/apache/incubator-seata/pull/7887#discussion_r2637682287
##########
discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java:
##########
@@ -247,16 +247,20 @@ private static String queryHttpAddress(String
clusterName, String group) {
map.put(inetSocketAddress.getHostString() +
IP_PORT_SPLIT_CHAR + inetSocketAddress.getPort(), node);
}
}
- addressList = stream.map(inetSocketAddress -> {
- String host = NetUtil.toStringHost(inetSocketAddress);
- Node node = map.get(host + IP_PORT_SPLIT_CHAR +
inetSocketAddress.getPort());
+ addressList = stream.map(instance -> {
Review Comment:
Variable [stream](1) may be null at this access because of [this](2)
assignment.
##########
discovery/seata-discovery-core/src/test/java/org/apache/seata/discovery/loadbalance/LoadBalanceTest.java:
##########
@@ -16,181 +16,229 @@
*/
package org.apache.seata.discovery.loadbalance;
+import org.apache.seata.common.metadata.ServiceInstance;
import org.apache.seata.common.rpc.RpcStatus;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
-/**
- * Created by guoyao on 2019/2/14.
- */
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class LoadBalanceTest {
private static final String XID = "XID";
/**
* Test random load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testRandomLoadBalance_select(List<InetSocketAddress>
addresses) {
+ @MethodSource("instanceProvider")
+ public void testRandomLoadBalance_select(List<ServiceInstance> instances)
throws Exception {
int runs = 10000;
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, new RandomLoadBalance());
- for (InetSocketAddress address : counter.keySet()) {
- Long count = counter.get(address).get();
- Assertions.assertTrue(count > 0, "selecte one time at last");
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, new RandomLoadBalance());
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
Review Comment:
The variable 'count' is only assigned values of primitive type and is never
'null', but it is declared with the boxed type 'Long'.
##########
discovery/seata-discovery-core/src/test/java/org/apache/seata/discovery/loadbalance/LoadBalanceTest.java:
##########
@@ -16,181 +16,229 @@
*/
package org.apache.seata.discovery.loadbalance;
+import org.apache.seata.common.metadata.ServiceInstance;
import org.apache.seata.common.rpc.RpcStatus;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
-/**
- * Created by guoyao on 2019/2/14.
- */
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class LoadBalanceTest {
private static final String XID = "XID";
/**
* Test random load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testRandomLoadBalance_select(List<InetSocketAddress>
addresses) {
+ @MethodSource("instanceProvider")
+ public void testRandomLoadBalance_select(List<ServiceInstance> instances)
throws Exception {
int runs = 10000;
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, new RandomLoadBalance());
- for (InetSocketAddress address : counter.keySet()) {
- Long count = counter.get(address).get();
- Assertions.assertTrue(count > 0, "selecte one time at last");
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, new RandomLoadBalance());
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
+ assertTrue(count > 0, "selecte one time at last");
}
}
/**
- * Test round robin load balance select.
+ * Test round-robin load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testRoundRobinLoadBalance_select(List<InetSocketAddress>
addresses) {
+ @MethodSource("instanceProvider")
+ public void testRoundRobinLoadBalance_select(List<ServiceInstance>
instances) throws Exception {
int runs = 10000;
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, new RoundRobinLoadBalance());
- for (InetSocketAddress address : counter.keySet()) {
- Long count = counter.get(address).get();
- Assertions.assertTrue(Math.abs(count - runs / (0f +
addresses.size())) < 1f, "abs diff shoud < 1");
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, new RoundRobinLoadBalance());
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
Review Comment:
The variable 'count' is only assigned values of primitive type and is never
'null', but it is declared with the boxed type 'Long'.
##########
discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java:
##########
@@ -617,6 +540,58 @@ private static boolean isTokenExpired() {
return System.currentTimeMillis() >= tokenExpiredTime;
}
+ /**
+ * Get a single available namingserver address with health check and load
balancing.
+ *
+ * @return a randomly selected healthy naming server address
+ */
+ public String getNamingAddr() {
+ if (namingServerAddressCache != null) {
+ return namingServerAddressCache;
+ }
+ Map<String, AtomicInteger> availableNamingserverMap = new
HashMap<>(AVAILABLE_NAMINGSERVER_MAP);
+ List<String> availableNamingserverList = new ArrayList<>();
+ for (Map.Entry<String, AtomicInteger> entry :
availableNamingserverMap.entrySet()) {
+ String namingServerAddress = entry.getKey();
+ Integer numberOfFailures = entry.getValue().get();
Review Comment:
The variable 'numberOfFailures' is only assigned values of primitive type
and is never 'null', but it is declared with the boxed type 'Integer'.
##########
discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/MockConsulRegistryServiceImpl.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.discovery.registry.consul;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
+import org.apache.seata.common.metadata.ServiceInstance;
+import org.apache.seata.common.thread.NamedThreadFactory;
+import org.apache.seata.common.util.NetUtil;
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.config.Configuration;
+import org.apache.seata.config.ConfigurationFactory;
+import org.apache.seata.config.ConfigurationKeys;
+import org.apache.seata.config.exception.ConfigNotFoundException;
+import org.apache.seata.discovery.registry.RegistryHeartBeats;
+import org.apache.seata.discovery.registry.RegistryService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Mock implementation of ConsulRegistryServiceImpl for testing purposes.
+ * Uses TTL checks instead of TCP checks to avoid connection issues in test
environment.
+ */
+public class MockConsulRegistryServiceImpl implements
RegistryService<ConsulListener> {
+
+ private static volatile MockConsulRegistryServiceImpl instance;
+ private static volatile ConsulClient client;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MockConsulRegistryServiceImpl.class);
+ private static final Configuration FILE_CONFIG =
ConfigurationFactory.CURRENT_FILE_INSTANCE;
+ private static final String FILE_ROOT_REGISTRY = "registry";
+ private static final String FILE_CONFIG_SPLIT_CHAR = ".";
+ private static final String REGISTRY_TYPE = "consul";
+ private static final String SERVER_ADDR_KEY = "serverAddr";
+ private static final String REGISTRY_CLUSTER = "cluster";
+ private static final String DEFAULT_CLUSTER_NAME = "default";
+ private static final String SERVICE_TAG = "services";
+ private static final String ACL_TOKEN = "aclToken";
+ private static final String FILE_CONFIG_KEY_PREFIX =
+ FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + REGISTRY_TYPE +
FILE_CONFIG_SPLIT_CHAR;
+
+ private ConcurrentMap<String, List<ServiceInstance>> clusterAddressMap;
+ private ConcurrentMap<String, Set<ConsulListener>> listenerMap;
+ private ExecutorService notifierExecutor;
+ private ConcurrentMap<String, ConsulNotifier> notifiers;
+
+ private static final int THREAD_POOL_NUM = 1;
+ private static final int MAP_INITIAL_CAPACITY = 8;
+
+ private String transactionServiceGroup;
+
+ /**
+ * default deregister critical server after
+ */
+ private static final String DEFAULT_DEREGISTER_TIME = "20s";
+ /**
+ * default watch timeout in second
+ */
+ private static final int DEFAULT_WATCH_TIMEOUT = 60;
+
+ private MockConsulRegistryServiceImpl() {
+ // initial the capacity with 8
+ clusterAddressMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
+ listenerMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
+ notifiers = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
+ notifierExecutor = new ThreadPoolExecutor(
+ THREAD_POOL_NUM,
+ THREAD_POOL_NUM,
+ Integer.MAX_VALUE,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("services-consul-notifier",
THREAD_POOL_NUM));
+ }
+
+ /**
+ * get instance of MockConsulRegistryServiceImpl
+ *
+ * @return instance
+ */
+ static MockConsulRegistryServiceImpl getInstance() {
+ if (instance == null) {
+ synchronized (MockConsulRegistryServiceImpl.class) {
+ if (instance == null) {
+ instance = new MockConsulRegistryServiceImpl();
+ }
+ }
+ }
+ return instance;
+ }
+
+ @Override
+ public void register(ServiceInstance instance) throws Exception {
+ InetSocketAddress address = instance.getAddress();
+ // Skip address validation for testing
+ // NetUtil.validAddress(address);
+
+ doRegister(instance);
+ // Immediately send TTL check to make service healthy
+ doTtlCheck(instance);
+ // Add heartbeat for re-registration and TTL check
+ RegistryHeartBeats.addHeartBeat(REGISTRY_TYPE, instance,
this::doRegister);
+ // Add TTL check to keep service healthy
+ RegistryHeartBeats.addHeartBeat(REGISTRY_TYPE, instance, 15000,
this::doTtlCheck);
+ }
+
+ private void doRegister(ServiceInstance instance) {
+ NewService service = createService(instance);
+ getConsulClient().agentServiceRegister(service, getAclToken());
+ }
+
+ private void doTtlCheck(ServiceInstance instance) throws Exception {
+ // Send TTL check to keep service healthy
+ String checkId = "service:" + createServiceId(instance.getAddress());
+ getConsulClient().agentCheckPass(checkId, getAclToken());
+ }
+
+ @Override
+ public void unregister(ServiceInstance instance) {
+ InetSocketAddress address = instance.getAddress();
+ // Skip address validation for testing
+ // NetUtil.validAddress(address);
+ getConsulClient().agentServiceDeregister(createServiceId(address),
getAclToken());
+ }
+
+ @Override
+ public void subscribe(String cluster, ConsulListener listener) {
+ // 1.add listener to subscribe list
+ listenerMap.computeIfAbsent(cluster, key -> new
HashSet<>()).add(listener);
+ // 2.get healthy services
+ Response<List<HealthService>> response = getHealthyServices(cluster,
-1, DEFAULT_WATCH_TIMEOUT);
+ // 3.get current consul index.
+ Long index = response.getConsulIndex();
+ ConsulNotifier notifier = notifiers.computeIfAbsent(cluster, key ->
new ConsulNotifier(cluster, index));
+ // 4.run notifier
+ notifierExecutor.submit(notifier);
+ }
+
+ @Override
+ public void unsubscribe(String cluster, ConsulListener listener) {
+ // 1.remove notifier for the cluster
+ ConsulNotifier notifier = notifiers.remove(cluster);
+ // 2.stop the notifier
+ notifier.stop();
+ }
+
+ @Override
+ public List<ServiceInstance> lookup(String key) {
+ transactionServiceGroup = key;
+ final String cluster = getServiceGroup(key);
+ if (cluster == null) {
+ String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR +
PREFIX_SERVICE_MAPPING + key;
+ throw new ConfigNotFoundException("%s configuration item is
required", missingDataId);
+ }
+ return lookupByCluster(cluster);
+ }
+
+ private List<ServiceInstance> lookupByCluster(String cluster) {
+ if (!listenerMap.containsKey(cluster)) {
+ // 1.refresh cluster
+ refreshCluster(cluster);
+ // 2. subscribe
+ subscribe(cluster, services -> refreshCluster(cluster, services));
+ }
+ return clusterAddressMap.get(cluster);
+ }
+
+ /**
+ * get consul client
+ *
+ * @return client
+ */
+ private ConsulClient getConsulClient() {
+ if (client == null) {
+ synchronized (MockConsulRegistryServiceImpl.class) {
+ if (client == null) {
+ String serverAddr =
FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERVER_ADDR_KEY);
+ InetSocketAddress inetSocketAddress =
NetUtil.toInetSocketAddress(serverAddr);
+ client = new ConsulClient(inetSocketAddress.getHostName(),
inetSocketAddress.getPort());
+ }
+ }
+ }
+ return client;
+ }
+
+ /**
+ * get cluster name , this function is only on the server use
+ *
+ * @return
+ */
+ private String getClusterName() {
+ String clusterConfigName =
+ String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY,
REGISTRY_TYPE, REGISTRY_CLUSTER);
+ return FILE_CONFIG.getConfig(clusterConfigName, DEFAULT_CLUSTER_NAME);
+ }
+
+ /**
+ * create serviceId
+ *
+ * @param address
+ * @return serviceId
+ */
+ private String createServiceId(InetSocketAddress address) {
+ return getClusterName() + "-" + NetUtil.toStringAddress(address);
+ }
+
+ /**
+ * get consul acl-token
+ *
+ * @return acl-token
+ */
+ private static String getAclToken() {
+ String fileConfigKey = String.join(
+ ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
+ ConfigurationKeys.FILE_ROOT_REGISTRY,
+ REGISTRY_TYPE,
+ ACL_TOKEN);
+ String aclToken = StringUtils.isNotBlank(System.getProperty(ACL_TOKEN))
+ ? System.getProperty(ACL_TOKEN)
+ : FILE_CONFIG.getConfig(fileConfigKey);
+ return StringUtils.isNotBlank(aclToken) ? aclToken : null;
+ }
+
+ /**
+ * create a new service
+ *
+ * @param instance
+ * @return newService
+ */
+ private NewService createService(ServiceInstance instance) {
+ InetSocketAddress address = instance.getAddress();
+
+ NewService newService = new NewService();
+ newService.setId(createServiceId(address));
+ newService.setName(getClusterName());
+ newService.setTags(Collections.singletonList(SERVICE_TAG));
+ newService.setPort(address.getPort());
+ newService.setAddress(NetUtil.toIpAddress(address));
+ newService.setCheck(createCheck(address));
+
newService.setMeta(ServiceInstance.getStringMap(instance.getMetadata()));
+ return newService;
+ }
+
+ /**
+ * create service check based on TTL (for testing purposes)
+ * This allows the service to be considered healthy without actually
running on the port
+ *
+ * @param address
+ * @return
+ */
+ private NewService.Check createCheck(InetSocketAddress address) {
+ NewService.Check check = new NewService.Check();
+ // Use TTL check instead of TCP check for testing
+ check.setTtl("30s");
+ check.setDeregisterCriticalServiceAfter(DEFAULT_DEREGISTER_TIME);
+ return check;
+ }
+
+ /**
+ * get healthy services
+ *
+ * @param service
+ * @return
+ */
+ private Response<List<HealthService>> getHealthyServices(String service,
long index, long watchTimeout) {
+ return getConsulClient()
+ .getHealthServices(
+ service,
+ HealthServicesRequest.newBuilder()
+ .setTag(SERVICE_TAG)
+ .setQueryParams(new QueryParams(watchTimeout,
index))
+ .setPassing(true)
+ .setToken(getAclToken())
+ .build());
+ }
+
+ /**
+ * refresh cluster
+ *
+ * @param cluster
+ */
+ private void refreshCluster(String cluster) {
+ if (StringUtils.isBlank(cluster)) {
+ return;
+ }
+ Response<List<HealthService>> response = getHealthyServices(cluster,
-1, -1);
+ if (response == null) {
+ return;
+ }
+ refreshCluster(cluster, response.getValue());
+ }
+
+ /**
+ * refresh cluster
+ *
+ * @param cluster
+ * @param services
+ */
+ private void refreshCluster(String cluster, List<HealthService> services) {
+ if (cluster == null || services == null) {
+ return;
+ }
+
+ List<ServiceInstance> instances = services.stream()
+ .map(HealthService::getService)
+ .map(service -> {
+ InetSocketAddress address = new
InetSocketAddress(service.getAddress(), service.getPort());
+ Map<String, Object> metadata = new HashMap<>();
+ if (service.getMeta() != null) {
+ metadata.putAll(service.getMeta());
+ }
+ return new ServiceInstance(address, metadata);
+ })
+ .collect(Collectors.toList());
+
+ clusterAddressMap.put(cluster, instances);
+
+ removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster,
instances);
+ }
+
+ /**
+ * consul notifier
+ */
+ private class ConsulNotifier implements Runnable {
+
+ private String cluster;
+ private long consulIndex;
+ private boolean running;
+ private boolean hasError = false;
+
+ ConsulNotifier(String cluster, long consulIndex) {
+ this.cluster = cluster;
+ this.consulIndex = consulIndex;
+ this.running = true;
+ }
+
+ @Override
+ public void run() {
+ while (this.running) {
+ try {
+ processService();
+ } catch (Exception exception) {
+ hasError = true;
+ LOGGER.error("consul refresh services error:{}",
exception.getMessage());
+ }
+ }
+ }
+
+ private void processService() {
+ Response<List<HealthService>> response =
getHealthyServices(cluster, consulIndex, DEFAULT_WATCH_TIMEOUT);
+ Long currentIndex = response.getConsulIndex();
+
+ if ((currentIndex != null && currentIndex > consulIndex) ||
hasError) {
Review Comment:
Variable [currentIndex](1) may be null at this access as suggested by
[this](2) null guard.
```suggestion
if (currentIndex != null && (currentIndex > consulIndex ||
hasError)) {
```
##########
discovery/seata-discovery-core/src/test/java/org/apache/seata/discovery/loadbalance/LoadBalanceTest.java:
##########
@@ -16,181 +16,229 @@
*/
package org.apache.seata.discovery.loadbalance;
+import org.apache.seata.common.metadata.ServiceInstance;
import org.apache.seata.common.rpc.RpcStatus;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
-/**
- * Created by guoyao on 2019/2/14.
- */
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class LoadBalanceTest {
private static final String XID = "XID";
/**
* Test random load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testRandomLoadBalance_select(List<InetSocketAddress>
addresses) {
+ @MethodSource("instanceProvider")
+ public void testRandomLoadBalance_select(List<ServiceInstance> instances)
throws Exception {
int runs = 10000;
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, new RandomLoadBalance());
- for (InetSocketAddress address : counter.keySet()) {
- Long count = counter.get(address).get();
- Assertions.assertTrue(count > 0, "selecte one time at last");
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, new RandomLoadBalance());
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
+ assertTrue(count > 0, "selecte one time at last");
}
}
/**
- * Test round robin load balance select.
+ * Test round-robin load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testRoundRobinLoadBalance_select(List<InetSocketAddress>
addresses) {
+ @MethodSource("instanceProvider")
+ public void testRoundRobinLoadBalance_select(List<ServiceInstance>
instances) throws Exception {
int runs = 10000;
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, new RoundRobinLoadBalance());
- for (InetSocketAddress address : counter.keySet()) {
- Long count = counter.get(address).get();
- Assertions.assertTrue(Math.abs(count - runs / (0f +
addresses.size())) < 1f, "abs diff shoud < 1");
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, new RoundRobinLoadBalance());
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
+ assertTrue(Math.abs(count - runs / (0f + instances.size())) < 1f,
"abs diff shoud < 1");
}
}
/**
* Test xid load load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testXIDLoadBalance_select(List<InetSocketAddress> addresses)
throws Exception {
+ @MethodSource("instanceProvider")
+ public void testXIDLoadBalance_select(List<ServiceInstance> instances)
throws Exception {
XIDLoadBalance loadBalance = new XIDLoadBalance();
// ipv4
- InetSocketAddress inetSocketAddress = loadBalance.select(addresses,
"127.0.0.1:8092:123456");
- Assertions.assertNotNull(inetSocketAddress);
+ ServiceInstance serviceInstance = loadBalance.select(instances,
"127.0.0.1:8092:123456");
+ assertNotNull(serviceInstance);
// ipv6
- inetSocketAddress = loadBalance.select(addresses,
"2000:0000:0000:0000:0001:2345:6789:abcd:8092:123456");
- Assertions.assertNotNull(inetSocketAddress);
+ serviceInstance = loadBalance.select(instances,
"2000:0000:0000:0000:0001:2345:6789:abcd:8092:123456");
+ assertNotNull(serviceInstance);
// test not found tc channel
- inetSocketAddress = loadBalance.select(addresses,
"127.0.0.1:8199:123456");
- Assertions.assertNotEquals(inetSocketAddress.getPort(), 8199);
+ serviceInstance = loadBalance.select(instances,
"127.0.0.1:8199:123456");
+ assertNotEquals(serviceInstance.getAddress().getPort(), 8199);
}
/**
- * Test consistent hash load load balance select.
+ * Test consistent hash load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testConsistentHashLoadBalance_select(List<InetSocketAddress>
addresses) {
+ @MethodSource("instanceProvider")
+ public void testConsistentHashLoadBalance_select(List<ServiceInstance>
instances) throws Exception {
int runs = 10000;
int selected = 0;
ConsistentHashLoadBalance loadBalance = new
ConsistentHashLoadBalance();
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, loadBalance);
- for (InetSocketAddress address : counter.keySet()) {
- if (counter.get(address).get() > 0) {
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, loadBalance);
+ for (ServiceInstance instance : counter.keySet()) {
+ if (counter.get(instance).get() > 0) {
selected++;
}
}
- Assertions.assertEquals(1, selected, "selected must be equal to 1");
+ assertEquals(1, selected, "selected must be equal to 1");
}
/**
* Test cached consistent hash load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void
testCachedConsistentHashLoadBalance_select(List<InetSocketAddress> addresses)
throws Exception {
+ @MethodSource("instanceProvider")
+ public void
testCachedConsistentHashLoadBalance_select(List<ServiceInstance> instances)
throws Exception {
ConsistentHashLoadBalance loadBalance = new
ConsistentHashLoadBalance();
- List<InetSocketAddress> addresses1 = new ArrayList<>(addresses);
- loadBalance.select(addresses1, XID);
+ List<ServiceInstance> instances1 = new ArrayList<>(instances);
+ loadBalance.select(instances1, XID);
Object o1 = getConsistentHashSelectorByReflect(loadBalance);
- List<InetSocketAddress> addresses2 = new ArrayList<>(addresses);
- loadBalance.select(addresses2, XID);
+ List<ServiceInstance> instances2 = new ArrayList<>(instances);
+ loadBalance.select(instances2, XID);
Object o2 = getConsistentHashSelectorByReflect(loadBalance);
- Assertions.assertEquals(o1, o2);
+ assertEquals(o1, o2);
- List<InetSocketAddress> addresses3 = new ArrayList<>(addresses);
-
addresses3.remove(ThreadLocalRandom.current().nextInt(addresses.size()));
- loadBalance.select(addresses3, XID);
+ List<ServiceInstance> instances3 = new ArrayList<>(instances);
+
instances3.remove(ThreadLocalRandom.current().nextInt(instances.size()));
+ loadBalance.select(instances3, XID);
Object o3 = getConsistentHashSelectorByReflect(loadBalance);
- Assertions.assertNotEquals(o1, o3);
+ assertNotEquals(o1, o3);
}
/**
* Test least active load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testLeastActiveLoadBalance_select(List<InetSocketAddress>
addresses) throws Exception {
+ @MethodSource("instanceProvider")
+ public void testLeastActiveLoadBalance_select(List<ServiceInstance>
instances) throws Exception {
int runs = 10000;
- int size = addresses.size();
+ int size = instances.size();
for (int i = 0; i < size - 1; i++) {
- RpcStatus.beginCount(addresses.get(i).toString());
+ RpcStatus.beginCount(instances.get(i).getAddress().toString());
}
- InetSocketAddress socketAddress = addresses.get(size - 1);
+ ServiceInstance targetInstance = instances.get(size - 1);
LoadBalance loadBalance = new LeastActiveLoadBalance();
for (int i = 0; i < runs; i++) {
- InetSocketAddress selectAddress = loadBalance.select(addresses,
XID);
- Assertions.assertEquals(selectAddress, socketAddress);
+ ServiceInstance selectInstance = loadBalance.select(instances,
XID);
+ assertEquals(selectInstance, targetInstance);
}
- RpcStatus.beginCount(socketAddress.toString());
- RpcStatus.beginCount(socketAddress.toString());
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, loadBalance);
- for (InetSocketAddress address : counter.keySet()) {
- Long count = counter.get(address).get();
- if (address == socketAddress) {
- Assertions.assertEquals(count, 0);
+ RpcStatus.beginCount(targetInstance.getAddress().toString());
+ RpcStatus.beginCount(targetInstance.getAddress().toString());
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, loadBalance);
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
+ if (instance == targetInstance) {
+ assertEquals(count, 0);
} else {
- Assertions.assertTrue(count > 0);
+ assertTrue(count > 0);
}
}
}
/**
- * Gets selected counter.
+ * Test weighted random load balance select with instances without weights.
+ * Should downgrade to random load balancing.
*
- * @param runs the runs
- * @param addresses the addresses
- * @param loadBalance the load balance
- * @return the selected counter
+ * @param instances the instances without weights
*/
- public Map<InetSocketAddress, AtomicLong> getSelectedCounter(
- int runs, List<InetSocketAddress> addresses, LoadBalance
loadBalance) {
- Assertions.assertNotNull(loadBalance);
- Map<InetSocketAddress, AtomicLong> counter = new ConcurrentHashMap<>();
- for (InetSocketAddress address : addresses) {
- counter.put(address, new AtomicLong(0));
+ @ParameterizedTest
+ @MethodSource("instanceProvider")
+ public void
testWeightedRandomLoadBalance_selectWithoutWeights(List<ServiceInstance>
instances) throws Exception {
+ int runs = 10000;
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, new WeightedRandomLoadBalance());
+
+ // Verify all instances are selected roughly equally (random
distribution)
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
Review Comment:
The variable 'count' is only assigned values of primitive type and is never
'null', but it is declared with the boxed type 'Long'.
##########
discovery/seata-discovery-core/src/test/java/org/apache/seata/discovery/loadbalance/LoadBalanceTest.java:
##########
@@ -16,181 +16,229 @@
*/
package org.apache.seata.discovery.loadbalance;
+import org.apache.seata.common.metadata.ServiceInstance;
import org.apache.seata.common.rpc.RpcStatus;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
-/**
- * Created by guoyao on 2019/2/14.
- */
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class LoadBalanceTest {
private static final String XID = "XID";
/**
* Test random load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testRandomLoadBalance_select(List<InetSocketAddress>
addresses) {
+ @MethodSource("instanceProvider")
+ public void testRandomLoadBalance_select(List<ServiceInstance> instances)
throws Exception {
int runs = 10000;
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, new RandomLoadBalance());
- for (InetSocketAddress address : counter.keySet()) {
- Long count = counter.get(address).get();
- Assertions.assertTrue(count > 0, "selecte one time at last");
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, new RandomLoadBalance());
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
+ assertTrue(count > 0, "selecte one time at last");
}
}
/**
- * Test round robin load balance select.
+ * Test round-robin load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testRoundRobinLoadBalance_select(List<InetSocketAddress>
addresses) {
+ @MethodSource("instanceProvider")
+ public void testRoundRobinLoadBalance_select(List<ServiceInstance>
instances) throws Exception {
int runs = 10000;
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, new RoundRobinLoadBalance());
- for (InetSocketAddress address : counter.keySet()) {
- Long count = counter.get(address).get();
- Assertions.assertTrue(Math.abs(count - runs / (0f +
addresses.size())) < 1f, "abs diff shoud < 1");
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, new RoundRobinLoadBalance());
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
+ assertTrue(Math.abs(count - runs / (0f + instances.size())) < 1f,
"abs diff shoud < 1");
}
}
/**
* Test xid load load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testXIDLoadBalance_select(List<InetSocketAddress> addresses)
throws Exception {
+ @MethodSource("instanceProvider")
+ public void testXIDLoadBalance_select(List<ServiceInstance> instances)
throws Exception {
XIDLoadBalance loadBalance = new XIDLoadBalance();
// ipv4
- InetSocketAddress inetSocketAddress = loadBalance.select(addresses,
"127.0.0.1:8092:123456");
- Assertions.assertNotNull(inetSocketAddress);
+ ServiceInstance serviceInstance = loadBalance.select(instances,
"127.0.0.1:8092:123456");
+ assertNotNull(serviceInstance);
// ipv6
- inetSocketAddress = loadBalance.select(addresses,
"2000:0000:0000:0000:0001:2345:6789:abcd:8092:123456");
- Assertions.assertNotNull(inetSocketAddress);
+ serviceInstance = loadBalance.select(instances,
"2000:0000:0000:0000:0001:2345:6789:abcd:8092:123456");
+ assertNotNull(serviceInstance);
// test not found tc channel
- inetSocketAddress = loadBalance.select(addresses,
"127.0.0.1:8199:123456");
- Assertions.assertNotEquals(inetSocketAddress.getPort(), 8199);
+ serviceInstance = loadBalance.select(instances,
"127.0.0.1:8199:123456");
+ assertNotEquals(serviceInstance.getAddress().getPort(), 8199);
}
/**
- * Test consistent hash load load balance select.
+ * Test consistent hash load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testConsistentHashLoadBalance_select(List<InetSocketAddress>
addresses) {
+ @MethodSource("instanceProvider")
+ public void testConsistentHashLoadBalance_select(List<ServiceInstance>
instances) throws Exception {
int runs = 10000;
int selected = 0;
ConsistentHashLoadBalance loadBalance = new
ConsistentHashLoadBalance();
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, loadBalance);
- for (InetSocketAddress address : counter.keySet()) {
- if (counter.get(address).get() > 0) {
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, loadBalance);
+ for (ServiceInstance instance : counter.keySet()) {
+ if (counter.get(instance).get() > 0) {
selected++;
}
}
- Assertions.assertEquals(1, selected, "selected must be equal to 1");
+ assertEquals(1, selected, "selected must be equal to 1");
}
/**
* Test cached consistent hash load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void
testCachedConsistentHashLoadBalance_select(List<InetSocketAddress> addresses)
throws Exception {
+ @MethodSource("instanceProvider")
+ public void
testCachedConsistentHashLoadBalance_select(List<ServiceInstance> instances)
throws Exception {
ConsistentHashLoadBalance loadBalance = new
ConsistentHashLoadBalance();
- List<InetSocketAddress> addresses1 = new ArrayList<>(addresses);
- loadBalance.select(addresses1, XID);
+ List<ServiceInstance> instances1 = new ArrayList<>(instances);
+ loadBalance.select(instances1, XID);
Object o1 = getConsistentHashSelectorByReflect(loadBalance);
- List<InetSocketAddress> addresses2 = new ArrayList<>(addresses);
- loadBalance.select(addresses2, XID);
+ List<ServiceInstance> instances2 = new ArrayList<>(instances);
+ loadBalance.select(instances2, XID);
Object o2 = getConsistentHashSelectorByReflect(loadBalance);
- Assertions.assertEquals(o1, o2);
+ assertEquals(o1, o2);
- List<InetSocketAddress> addresses3 = new ArrayList<>(addresses);
-
addresses3.remove(ThreadLocalRandom.current().nextInt(addresses.size()));
- loadBalance.select(addresses3, XID);
+ List<ServiceInstance> instances3 = new ArrayList<>(instances);
+
instances3.remove(ThreadLocalRandom.current().nextInt(instances.size()));
+ loadBalance.select(instances3, XID);
Object o3 = getConsistentHashSelectorByReflect(loadBalance);
- Assertions.assertNotEquals(o1, o3);
+ assertNotEquals(o1, o3);
}
/**
* Test least active load balance select.
*
- * @param addresses the addresses
+ * @param instances the instances
*/
@ParameterizedTest
- @MethodSource("addressProvider")
- public void testLeastActiveLoadBalance_select(List<InetSocketAddress>
addresses) throws Exception {
+ @MethodSource("instanceProvider")
+ public void testLeastActiveLoadBalance_select(List<ServiceInstance>
instances) throws Exception {
int runs = 10000;
- int size = addresses.size();
+ int size = instances.size();
for (int i = 0; i < size - 1; i++) {
- RpcStatus.beginCount(addresses.get(i).toString());
+ RpcStatus.beginCount(instances.get(i).getAddress().toString());
}
- InetSocketAddress socketAddress = addresses.get(size - 1);
+ ServiceInstance targetInstance = instances.get(size - 1);
LoadBalance loadBalance = new LeastActiveLoadBalance();
for (int i = 0; i < runs; i++) {
- InetSocketAddress selectAddress = loadBalance.select(addresses,
XID);
- Assertions.assertEquals(selectAddress, socketAddress);
+ ServiceInstance selectInstance = loadBalance.select(instances,
XID);
+ assertEquals(selectInstance, targetInstance);
}
- RpcStatus.beginCount(socketAddress.toString());
- RpcStatus.beginCount(socketAddress.toString());
- Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs,
addresses, loadBalance);
- for (InetSocketAddress address : counter.keySet()) {
- Long count = counter.get(address).get();
- if (address == socketAddress) {
- Assertions.assertEquals(count, 0);
+ RpcStatus.beginCount(targetInstance.getAddress().toString());
+ RpcStatus.beginCount(targetInstance.getAddress().toString());
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, loadBalance);
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
+ if (instance == targetInstance) {
+ assertEquals(count, 0);
} else {
- Assertions.assertTrue(count > 0);
+ assertTrue(count > 0);
}
}
}
/**
- * Gets selected counter.
+ * Test weighted random load balance select with instances without weights.
+ * Should downgrade to random load balancing.
*
- * @param runs the runs
- * @param addresses the addresses
- * @param loadBalance the load balance
- * @return the selected counter
+ * @param instances the instances without weights
*/
- public Map<InetSocketAddress, AtomicLong> getSelectedCounter(
- int runs, List<InetSocketAddress> addresses, LoadBalance
loadBalance) {
- Assertions.assertNotNull(loadBalance);
- Map<InetSocketAddress, AtomicLong> counter = new ConcurrentHashMap<>();
- for (InetSocketAddress address : addresses) {
- counter.put(address, new AtomicLong(0));
+ @ParameterizedTest
+ @MethodSource("instanceProvider")
+ public void
testWeightedRandomLoadBalance_selectWithoutWeights(List<ServiceInstance>
instances) throws Exception {
+ int runs = 10000;
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, new WeightedRandomLoadBalance());
+
+ // Verify all instances are selected roughly equally (random
distribution)
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
+ assertTrue(count > 0);
+
+ // In random distribution, each instance should be selected
roughly 1/n times
+ double expectedCount = runs / (double) instances.size();
+ double actualCount = count;
+ double tolerance = expectedCount * 0.2; // 20% tolerance
+ assertTrue(Math.abs(actualCount - expectedCount) < tolerance);
}
- try {
- for (int i = 0; i < runs; i++) {
- InetSocketAddress selectAddress =
loadBalance.select(addresses, XID);
- counter.get(selectAddress).incrementAndGet();
- }
- } catch (Exception e) {
- // do nothing
+ }
+
+ /**
+ * Test weighted random load balance select with weighted instances.
+ * Verifies that instances with higher weights are selected more
frequently.
+ *
+ * @param instances the instances with weights configured
+ */
+ @ParameterizedTest
+ @MethodSource("weightedInstanceProvider")
+ public void
testWeightedRandomLoadBalance_selectWithWeights(List<ServiceInstance>
instances) throws Exception {
+ int runs = 10000;
+ Map<ServiceInstance, AtomicLong> counter = getSelectedCounter(runs,
instances, new WeightedRandomLoadBalance());
+
+ // Verify all instances are selected at least once
+ for (ServiceInstance instance : counter.keySet()) {
+ Long count = counter.get(instance).get();
Review Comment:
The variable 'count' is only assigned values of primitive type and is never
'null', but it is declared with the boxed type 'Long'.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]