This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new b0c2bb265f optimize: fall back to any of available cluster address when query cluster address is empty (#6797) b0c2bb265f is described below commit b0c2bb265f7921ccba59622099c1229975884c2e Author: laywin <lay...@yeah.net> AuthorDate: Fri Aug 30 11:28:03 2024 +0800 optimize: fall back to any of available cluster address when query cluster address is empty (#6797) --- changes/en-us/2.x.md | 2 + changes/zh-cn/2.x.md | 3 +- .../registry/consul/ConsulRegistryServiceImpl.java | 5 +- .../seata/discovery/registry/RegistryService.java | 41 ++++++++++++---- .../registry/etcd3/EtcdRegistryServiceImpl.java | 5 +- .../registry/eureka/EurekaRegistryServiceImpl.java | 5 +- .../registry/nacos/NacosRegistryServiceImpl.java | 4 +- .../NamingserverRegistryServiceImpl.java | 55 ++++++++++++++-------- .../registry/redis/RedisRegistryServiceImpl.java | 5 +- .../registry/sofa/SofaRegistryServiceImpl.java | 5 +- .../registry/zk/ZookeeperRegisterServiceImpl.java | 5 +- .../zk/ZookeeperRegisterServiceImplTest.java | 28 ++++++----- 12 files changed, 113 insertions(+), 50 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 11b52612f7..a779856f93 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -28,6 +28,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix namingserver node term - [[#6765](https://github.com/apache/incubator-seata/pull/6765)] fix MySQL driver loading by replacing custom classloader with system classloader for better compatibility and simplified process - [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer +- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty ### optimize: @@ -113,6 +114,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [imashimaro](https://github.com/hmj776521114) - [lyl2008dsg](https://github.com/lyl2008dsg) - [l81893521](https://github.com/l81893521) +- [laywin](https://github.com/laywin) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 2762ce06f9..a915329758 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -29,6 +29,7 @@ - [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 修复namingserver的节点term为0问题 - [[#6765](https://github.com/apache/incubator-seata/pull/6765)] 改进MySQL驱动加载机制,将自定义类加载器替换为系统类加载器,更兼容简化流程 - [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug +- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址 ### optimize: @@ -73,7 +74,6 @@ - [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题 - [[#6794](https://github.com/apache/incubator-seata/pull/6794)] 优化 NacosMockTest 单测问题 - ### refactor: @@ -117,6 +117,7 @@ - [imashimaro](https://github.com/hmj776521114) - [lyl2008dsg](https://github.com/lyl2008dsg) - [l81893521](https://github.com/l81893521) +- [laywin](https://github.com/laywin) diff --git a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java index e44ac33b3a..72fa626b1f 100644 --- a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java +++ b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java @@ -75,6 +75,8 @@ public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener private static final int THREAD_POOL_NUM = 1; private static final int MAP_INITIAL_CAPACITY = 8; + private String transactionServiceGroup; + /** * default tcp check interval */ @@ -161,6 +163,7 @@ public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener @Override public List<InetSocketAddress> lookup(String key) throws Exception { + transactionServiceGroup = key; final String cluster = getServiceGroup(key); if (cluster == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -311,7 +314,7 @@ public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener clusterAddressMap.put(cluster, addresses); - removeOfflineAddressesIfNecessary(cluster, addresses); + removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, addresses); } /** diff --git a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java index fd9561434f..efc997b20b 100644 --- a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java @@ -16,8 +16,10 @@ */ package org.apache.seata.discovery.registry; +import org.apache.seata.common.util.CollectionUtils; +import org.apache.seata.config.ConfigurationFactory; + import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -27,8 +29,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import org.apache.seata.config.ConfigurationFactory; - /** * The interface Registry service. * @@ -54,7 +54,7 @@ public interface RegistryService<T> { /** * Service node health check */ - Map<String,List<InetSocketAddress>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>(); + Map<String, Map<String, List<InetSocketAddress>>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>(); /** * Register. * @@ -119,12 +119,29 @@ public interface RegistryService<T> { } default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) { - return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>()); + Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + k -> new ConcurrentHashMap<>()); + + String clusterName = getServiceGroup(transactionServiceGroup); + List<InetSocketAddress> inetSocketAddresses = clusterAddressMap.get(clusterName); + if (CollectionUtils.isNotEmpty(inetSocketAddresses)) { + return inetSocketAddresses; + } + + // fall back to addresses of any cluster + return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty) + .findAny().orElse(Collections.emptyList()); } default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup, List<InetSocketAddress> aliveAddress) { - return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress); + + Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + key -> new ConcurrentHashMap<>()); + + String clusterName = getServiceGroup(transactionServiceGroup); + + return clusterAddressMap.put(clusterName, aliveAddress); } @@ -137,15 +154,21 @@ public interface RegistryService<T> { * @param clusterName * @param newAddressed */ - default void removeOfflineAddressesIfNecessary(String clusterName, Collection<InetSocketAddress> newAddressed) { + default void removeOfflineAddressesIfNecessary(String transactionGroupService, String clusterName, Collection<InetSocketAddress> newAddressed) { + + Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService, + key -> new ConcurrentHashMap<>()); - List<InetSocketAddress> currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList()); + List<InetSocketAddress> currentAddresses = clusterAddressMap.getOrDefault(clusterName, Collections.emptyList()); List<InetSocketAddress> inetSocketAddresses = currentAddresses .stream().filter(newAddressed::contains).collect( Collectors.toList()); - CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses); + // prevent empty update + if (CollectionUtils.isNotEmpty(inetSocketAddresses)) { + clusterAddressMap.put(clusterName, inetSocketAddresses); + } } } diff --git a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java index 70e3dfe852..96d3cf0511 100644 --- a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java +++ b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java @@ -75,6 +75,8 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> private static final int MAP_INITIAL_CAPACITY = 8; private static final int THREAD_POOL_SIZE = 2; private ExecutorService executorService; + + private String transactionServiceGroup; /** * TTL for lease */ @@ -181,6 +183,7 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> @Override public List<InetSocketAddress> lookup(String key) throws Exception { + transactionServiceGroup = key; final String cluster = getServiceGroup(key); if (cluster == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -252,7 +255,7 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> }).collect(Collectors.toList()); clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList)); - removeOfflineAddressesIfNecessary(cluster, instanceList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, instanceList); } /** diff --git a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java index 80d640c57c..5ab5191234 100644 --- a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java +++ b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java @@ -75,6 +75,8 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis private static volatile EurekaRegistryServiceImpl instance; private static volatile EurekaClient eurekaClient; + private String transactionServiceGroup; + private EurekaRegistryServiceImpl() { } @@ -130,6 +132,7 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis @Override public List<InetSocketAddress> lookup(String key) throws Exception { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -169,7 +172,7 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } } diff --git a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java index 7ddda0d0e7..1f6abccba4 100644 --- a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java +++ b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java @@ -84,6 +84,8 @@ public class NacosRegistryServiceImpl implements RegistryService<EventListener> private static final Pattern DEFAULT_SLB_REGISTRY_PATTERN = Pattern.compile("(?!.*internal)(?=.*seata).*mse.aliyuncs.com"); private static volatile Boolean useSLBWay; + private String transactionServiceGroup; + private NacosRegistryServiceImpl() { String configForNacosSLB = FILE_CONFIG.getConfig(getNacosUrlPatternOfSLB()); Pattern patternOfNacosRegistryForSLB = StringUtils.isBlank(configForNacosSLB) @@ -193,7 +195,7 @@ public class NacosRegistryServiceImpl implements RegistryService<EventListener> .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } }); } diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java index 2781f4687b..a479fe69d1 100644 --- a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java @@ -16,16 +16,16 @@ */ package org.apache.seata.discovery.registry.namingserver; - import java.io.IOException; import java.net.InetSocketAddress; import java.rmi.RemoteException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; +import java.util.HashMap; +import java.util.Objects; import java.util.Map; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Objects; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -41,22 +41,23 @@ import java.util.stream.Collectors; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ContentType; import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; import org.apache.seata.common.metadata.Node; import org.apache.seata.common.metadata.namingserver.Instance; import org.apache.seata.common.metadata.namingserver.MetaResponse; import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.common.util.CollectionUtils; +import org.apache.seata.common.util.HttpClientUtil; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; -import org.apache.seata.common.util.HttpClientUtil; import org.apache.seata.discovery.registry.RegistryService; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -322,17 +323,6 @@ public class NamingserverRegistryServiceImpl implements RegistryService<NamingLi isSubscribed = false; } - @Override - public List<InetSocketAddress> aliveLookup(String transactionServiceGroup) { - return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>()); - } - - @Override - public List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup, - List<InetSocketAddress> aliveAddress) { - return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress); - } - /** * @param key vGroup name * @return List<InetSocketAddress> available instance list @@ -413,6 +403,31 @@ public class NamingserverRegistryServiceImpl implements RegistryService<NamingLi return namespace; } + @Override + public List<InetSocketAddress> aliveLookup(String transactionServiceGroup) { + Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + k -> new ConcurrentHashMap<>()); + + List<InetSocketAddress> inetSocketAddresses = clusterAddressMap.get(transactionServiceGroup); + if (CollectionUtils.isNotEmpty(inetSocketAddresses)) { + return inetSocketAddresses; + } + + // fall back to addresses of any cluster + return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty) + .findAny().orElse(Collections.emptyList()); + } + + @Override + public List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup, + List<InetSocketAddress> aliveAddress) { + Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + key -> new ConcurrentHashMap<>()); + + + return clusterAddressMap.put(transactionServiceGroup, aliveAddress); + } + /** * get one namingserver url diff --git a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java index 769799f693..6c18a57e72 100644 --- a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java +++ b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java @@ -74,6 +74,8 @@ public class RedisRegistryServiceImpl implements RegistryService<RedisListener> private static final long KEY_TTL = 5L; private static final long KEY_REFRESH_PERIOD = 2000L; + private String transactionServiceGroup; + private ScheduledExecutorService threadPoolExecutorForSubscribe = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("RedisRegistryService-subscribe", 1)); private ScheduledExecutorService threadPoolExecutorForUpdateMap = new ScheduledThreadPoolExecutor(1, @@ -219,6 +221,7 @@ public class RedisRegistryServiceImpl implements RegistryService<RedisListener> @Override public List<InetSocketAddress> lookup(String key) { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -280,7 +283,7 @@ public class RedisRegistryServiceImpl implements RegistryService<RedisListener> } socketAddresses.remove(inetSocketAddress); - removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses); + removeOfflineAddressesIfNecessary(transactionServiceGroup, notifyCluserName, socketAddresses); } @Override diff --git a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java index a7e4e087fa..fa0b428dda 100644 --- a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java +++ b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java @@ -81,6 +81,8 @@ public class SofaRegistryServiceImpl implements RegistryService<SubscriberDataOb private static volatile SofaRegistryServiceImpl instance; + private String transactionServiceGroup; + private SofaRegistryServiceImpl() { } @@ -159,6 +161,7 @@ public class SofaRegistryServiceImpl implements RegistryService<SubscriberDataOb @Override public List<InetSocketAddress> lookup(String key) throws Exception { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -174,7 +177,7 @@ public class SofaRegistryServiceImpl implements RegistryService<SubscriberDataOb List<InetSocketAddress> newAddressList = flatData(instances); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } respondRegistries.countDown(); }); diff --git a/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java b/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java index 9b112b1efb..84e70abe04 100644 --- a/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java +++ b/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java @@ -78,6 +78,8 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis private static final int REGISTERED_PATH_SET_SIZE = 1; private static final Set<String> REGISTERED_PATH_SET = Collections.synchronizedSet(new HashSet<>(REGISTERED_PATH_SET_SIZE)); + private String transactionServiceGroup; + private ZookeeperRegisterServiceImpl() { } @@ -175,6 +177,7 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis */ @Override public List<InetSocketAddress> lookup(String key) throws Exception { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { @@ -309,7 +312,7 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis } CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } private String getClusterName() { diff --git a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index cd7afa792f..dffd1a279a 100644 --- a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -18,9 +18,7 @@ package org.apache.seata.discovery.registry.zk; import java.lang.reflect.Field; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -128,18 +126,22 @@ public class ZookeeperRegisterServiceImplTest { @Test public void testRemoveOfflineAddressesIfNecessaryNoRemoveCase() { - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); - service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + Map<String, List<InetSocketAddress>> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); + addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + service.removeOfflineAddressesIfNecessary("default_tx_group","cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); - Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("cluster").size()); + Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster").size()); } @Test - public void testRemoveOfflineAddressesIfNecessaryRemoveCase() { - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); - service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + public void testRemovePreventEmptyPushCase() { + Map<String, List<InetSocketAddress>> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); - Assertions.assertEquals(0, service.CURRENT_ADDRESS_MAP.get("cluster").size()); + addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + + service.removeOfflineAddressesIfNecessary("default_tx_group", "cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + + Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster").size()); } @Test @@ -148,7 +150,8 @@ public class ZookeeperRegisterServiceImplTest { System.setProperty("txServiceGroup", "default_tx_group"); System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + Map<String, List<InetSocketAddress>> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); + addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); List<InetSocketAddress> result = service.aliveLookup("default_tx_group"); Assertions.assertEquals(result, Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); @@ -161,12 +164,11 @@ public class ZookeeperRegisterServiceImplTest { System.setProperty("txServiceGroup", "default_tx_group"); System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); service.refreshAliveLookup("default_tx_group", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); - Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("cluster"), + Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster"), Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org