This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new b0c2bb265f optimize: fall back to any of available cluster address 
when query cluster address is empty (#6797)
b0c2bb265f is described below

commit b0c2bb265f7921ccba59622099c1229975884c2e
Author: laywin <lay...@yeah.net>
AuthorDate: Fri Aug 30 11:28:03 2024 +0800

    optimize: fall back to any of available cluster address when query cluster 
address is empty (#6797)
---
 changes/en-us/2.x.md                               |  2 +
 changes/zh-cn/2.x.md                               |  3 +-
 .../registry/consul/ConsulRegistryServiceImpl.java |  5 +-
 .../seata/discovery/registry/RegistryService.java  | 41 ++++++++++++----
 .../registry/etcd3/EtcdRegistryServiceImpl.java    |  5 +-
 .../registry/eureka/EurekaRegistryServiceImpl.java |  5 +-
 .../registry/nacos/NacosRegistryServiceImpl.java   |  4 +-
 .../NamingserverRegistryServiceImpl.java           | 55 ++++++++++++++--------
 .../registry/redis/RedisRegistryServiceImpl.java   |  5 +-
 .../registry/sofa/SofaRegistryServiceImpl.java     |  5 +-
 .../registry/zk/ZookeeperRegisterServiceImpl.java  |  5 +-
 .../zk/ZookeeperRegisterServiceImplTest.java       | 28 ++++++-----
 12 files changed, 113 insertions(+), 50 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 11b52612f7..a779856f93 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -28,6 +28,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix 
namingserver node term
 - [[#6765](https://github.com/apache/incubator-seata/pull/6765)] fix MySQL 
driver loading by replacing custom classloader with system classloader for 
better compatibility and simplified process
 - [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue 
where the TC occasionally fails to go offline from the NamingServer
+- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to 
any of available cluster address when query cluster address is empty
 
 
 ### optimize:
@@ -113,6 +114,7 @@ Thanks to these contributors for their code commits. Please 
report an unintended
 - [imashimaro](https://github.com/hmj776521114)
 - [lyl2008dsg](https://github.com/lyl2008dsg)
 - [l81893521](https://github.com/l81893521)
+- [laywin](https://github.com/laywin)
 
 
 Also, we receive many valuable issues, questions and advices from our 
community. Thanks for you all.
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 2762ce06f9..a915329758 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -29,6 +29,7 @@
 - [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 
修复namingserver的节点term为0问题
 - [[#6765](https://github.com/apache/incubator-seata/pull/6765)] 
改进MySQL驱动加载机制,将自定义类加载器替换为系统类加载器,更兼容简化流程
 - [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 
修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug
+- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 
当查询的集群地址为空时,获取可用的任意集群地址
 
 
 ### optimize:
@@ -73,7 +74,6 @@
 - [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 
依赖冲突问题
 - [[#6794](https://github.com/apache/incubator-seata/pull/6794)] 优化 
NacosMockTest 单测问题
 
-
 ### refactor:
 
 
@@ -117,6 +117,7 @@
 - [imashimaro](https://github.com/hmj776521114)
 - [lyl2008dsg](https://github.com/lyl2008dsg)
 - [l81893521](https://github.com/l81893521)
+- [laywin](https://github.com/laywin)
 
 
 
diff --git 
a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
 
b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
index e44ac33b3a..72fa626b1f 100644
--- 
a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
@@ -75,6 +75,8 @@ public class ConsulRegistryServiceImpl implements 
RegistryService<ConsulListener
     private static final int THREAD_POOL_NUM = 1;
     private static final int MAP_INITIAL_CAPACITY = 8;
 
+    private String transactionServiceGroup;
+
     /**
      * default tcp check interval
      */
@@ -161,6 +163,7 @@ public class ConsulRegistryServiceImpl implements 
RegistryService<ConsulListener
 
     @Override
     public List<InetSocketAddress> lookup(String key) throws Exception {
+        transactionServiceGroup = key;
         final String cluster = getServiceGroup(key);
         if (cluster == null) {
             String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + 
PREFIX_SERVICE_MAPPING + key;
@@ -311,7 +314,7 @@ public class ConsulRegistryServiceImpl implements 
RegistryService<ConsulListener
 
         clusterAddressMap.put(cluster, addresses);
 
-        removeOfflineAddressesIfNecessary(cluster, addresses);
+        removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, 
addresses);
     }
 
     /**
diff --git 
a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java
 
b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java
index fd9561434f..efc997b20b 100644
--- 
a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java
+++ 
b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java
@@ -16,8 +16,10 @@
  */
 package org.apache.seata.discovery.registry;
 
+import org.apache.seata.common.util.CollectionUtils;
+import org.apache.seata.config.ConfigurationFactory;
+
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -27,8 +29,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-import org.apache.seata.config.ConfigurationFactory;
-
 /**
  * The interface Registry service.
  *
@@ -54,7 +54,7 @@ public interface RegistryService<T> {
     /**
      * Service node health check
      */
-    Map<String,List<InetSocketAddress>> CURRENT_ADDRESS_MAP = new 
ConcurrentHashMap<>();
+    Map<String, Map<String, List<InetSocketAddress>>> CURRENT_ADDRESS_MAP = 
new ConcurrentHashMap<>();
     /**
      * Register.
      *
@@ -119,12 +119,29 @@ public interface RegistryService<T> {
     }
 
     default List<InetSocketAddress> aliveLookup(String 
transactionServiceGroup) {
-        return 
CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k 
-> new ArrayList<>());
+        Map<String, List<InetSocketAddress>> clusterAddressMap = 
CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
+            k -> new ConcurrentHashMap<>());
+
+        String clusterName = getServiceGroup(transactionServiceGroup);
+        List<InetSocketAddress> inetSocketAddresses = 
clusterAddressMap.get(clusterName);
+        if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
+            return inetSocketAddresses;
+        }
+
+        // fall back to addresses of any cluster
+        return 
clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty)
+                .findAny().orElse(Collections.emptyList());
     }
 
     default List<InetSocketAddress> refreshAliveLookup(String 
transactionServiceGroup,
         List<InetSocketAddress> aliveAddress) {
-        return 
CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress);
+
+        Map<String, List<InetSocketAddress>> clusterAddressMap = 
CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
+            key -> new ConcurrentHashMap<>());
+
+        String clusterName = getServiceGroup(transactionServiceGroup);
+
+        return clusterAddressMap.put(clusterName, aliveAddress);
     }
 
 
@@ -137,15 +154,21 @@ public interface RegistryService<T> {
      * @param clusterName
      * @param newAddressed
      */
-    default void removeOfflineAddressesIfNecessary(String clusterName, 
Collection<InetSocketAddress> newAddressed) {
+    default void removeOfflineAddressesIfNecessary(String 
transactionGroupService, String clusterName, Collection<InetSocketAddress> 
newAddressed) {
+
+        Map<String, List<InetSocketAddress>> clusterAddressMap = 
CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService,
+            key -> new ConcurrentHashMap<>());
 
-        List<InetSocketAddress> currentAddresses = 
CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList());
+        List<InetSocketAddress> currentAddresses = 
clusterAddressMap.getOrDefault(clusterName, Collections.emptyList());
 
         List<InetSocketAddress> inetSocketAddresses = currentAddresses
                 .stream().filter(newAddressed::contains).collect(
                         Collectors.toList());
 
-        CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses);
+        // prevent empty update
+        if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
+            clusterAddressMap.put(clusterName, inetSocketAddresses);
+        }
     }
 
 }
diff --git 
a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
 
b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
index 70e3dfe852..96d3cf0511 100644
--- 
a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
@@ -75,6 +75,8 @@ public class EtcdRegistryServiceImpl implements 
RegistryService<Watch.Listener>
     private static final int MAP_INITIAL_CAPACITY = 8;
     private static final int THREAD_POOL_SIZE = 2;
     private ExecutorService executorService;
+
+    private String transactionServiceGroup;
     /**
      * TTL for lease
      */
@@ -181,6 +183,7 @@ public class EtcdRegistryServiceImpl implements 
RegistryService<Watch.Listener>
 
     @Override
     public List<InetSocketAddress> lookup(String key) throws Exception {
+        transactionServiceGroup = key;
         final String cluster = getServiceGroup(key);
         if (cluster == null) {
             String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + 
PREFIX_SERVICE_MAPPING + key;
@@ -252,7 +255,7 @@ public class EtcdRegistryServiceImpl implements 
RegistryService<Watch.Listener>
         }).collect(Collectors.toList());
         clusterAddressMap.put(cluster, new 
Pair<>(getResponse.getHeader().getRevision(), instanceList));
 
-        removeOfflineAddressesIfNecessary(cluster, instanceList);
+        removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, 
instanceList);
     }
 
     /**
diff --git 
a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
 
b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
index 80d640c57c..5ab5191234 100644
--- 
a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
@@ -75,6 +75,8 @@ public class EurekaRegistryServiceImpl implements 
RegistryService<EurekaEventLis
     private static volatile EurekaRegistryServiceImpl instance;
     private static volatile EurekaClient eurekaClient;
 
+    private String transactionServiceGroup;
+
     private EurekaRegistryServiceImpl() {
     }
 
@@ -130,6 +132,7 @@ public class EurekaRegistryServiceImpl implements 
RegistryService<EurekaEventLis
 
     @Override
     public List<InetSocketAddress> lookup(String key) throws Exception {
+        transactionServiceGroup = key;
         String clusterName = getServiceGroup(key);
         if (clusterName == null) {
             String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + 
PREFIX_SERVICE_MAPPING + key;
@@ -169,7 +172,7 @@ public class EurekaRegistryServiceImpl implements 
RegistryService<EurekaEventLis
                     .collect(Collectors.toList());
             CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
 
-            removeOfflineAddressesIfNecessary(clusterName, newAddressList);
+            removeOfflineAddressesIfNecessary(transactionServiceGroup, 
clusterName, newAddressList);
         }
     }
 
diff --git 
a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
 
b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
index 7ddda0d0e7..1f6abccba4 100644
--- 
a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
@@ -84,6 +84,8 @@ public class NacosRegistryServiceImpl implements 
RegistryService<EventListener>
     private static final Pattern DEFAULT_SLB_REGISTRY_PATTERN = 
Pattern.compile("(?!.*internal)(?=.*seata).*mse.aliyuncs.com");
     private static volatile Boolean useSLBWay;
 
+    private String transactionServiceGroup;
+
     private NacosRegistryServiceImpl() {
         String configForNacosSLB = 
FILE_CONFIG.getConfig(getNacosUrlPatternOfSLB());
         Pattern patternOfNacosRegistryForSLB = 
StringUtils.isBlank(configForNacosSLB)
@@ -193,7 +195,7 @@ public class NacosRegistryServiceImpl implements 
RegistryService<EventListener>
                                     .collect(Collectors.toList());
                             CLUSTER_ADDRESS_MAP.put(clusterName, 
newAddressList);
 
-                            removeOfflineAddressesIfNecessary(clusterName, 
newAddressList);
+                            
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, 
newAddressList);
                         }
                     });
                 }
diff --git 
a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
 
b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
index 2781f4687b..a479fe69d1 100644
--- 
a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
@@ -16,16 +16,16 @@
  */
 package org.apache.seata.discovery.registry.namingserver;
 
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.rmi.RemoteException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Objects;
 import java.util.Map;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Objects;
+import java.util.Collections;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,22 +41,23 @@ import java.util.stream.Collectors;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.entity.ContentType;
 import org.apache.http.protocol.HTTP;
+import org.apache.http.util.EntityUtils;
 import org.apache.seata.common.metadata.Node;
 import org.apache.seata.common.metadata.namingserver.Instance;
 import org.apache.seata.common.metadata.namingserver.MetaResponse;
 import org.apache.seata.common.thread.NamedThreadFactory;
+import org.apache.seata.common.util.CollectionUtils;
+import org.apache.seata.common.util.HttpClientUtil;
 import org.apache.seata.common.util.NetUtil;
 import org.apache.seata.common.util.StringUtils;
 import org.apache.seata.config.Configuration;
 import org.apache.seata.config.ConfigurationFactory;
-import org.apache.seata.common.util.HttpClientUtil;
 import org.apache.seata.discovery.registry.RegistryService;
-import org.apache.http.HttpStatus;
-import org.apache.http.StatusLine;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -322,17 +323,6 @@ public class NamingserverRegistryServiceImpl implements 
RegistryService<NamingLi
         isSubscribed = false;
     }
 
-    @Override
-    public List<InetSocketAddress> aliveLookup(String transactionServiceGroup) 
{
-        return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k 
-> new ArrayList<>());
-    }
-
-    @Override
-    public List<InetSocketAddress> refreshAliveLookup(String 
transactionServiceGroup,
-                                                      List<InetSocketAddress> 
aliveAddress) {
-        return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress);
-    }
-
     /**
      * @param key vGroup name
      * @return List<InetSocketAddress> available instance list
@@ -413,6 +403,31 @@ public class NamingserverRegistryServiceImpl implements 
RegistryService<NamingLi
         return namespace;
     }
 
+    @Override
+    public List<InetSocketAddress> aliveLookup(String transactionServiceGroup) 
{
+        Map<String, List<InetSocketAddress>> clusterAddressMap = 
CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
+            k -> new ConcurrentHashMap<>());
+
+        List<InetSocketAddress> inetSocketAddresses = 
clusterAddressMap.get(transactionServiceGroup);
+        if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
+            return inetSocketAddresses;
+        }
+
+        // fall back to addresses of any cluster
+        return 
clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty)
+                .findAny().orElse(Collections.emptyList());
+    }
+
+    @Override
+    public List<InetSocketAddress> refreshAliveLookup(String 
transactionServiceGroup,
+                                                      List<InetSocketAddress> 
aliveAddress) {
+        Map<String, List<InetSocketAddress>> clusterAddressMap = 
CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
+            key -> new ConcurrentHashMap<>());
+
+
+        return clusterAddressMap.put(transactionServiceGroup, aliveAddress);
+    }
+
 
     /**
      * get one namingserver url
diff --git 
a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
 
b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
index 769799f693..6c18a57e72 100644
--- 
a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
@@ -74,6 +74,8 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
     private static final long KEY_TTL = 5L;
     private static final long KEY_REFRESH_PERIOD = 2000L;
 
+    private String transactionServiceGroup;
+
     private ScheduledExecutorService threadPoolExecutorForSubscribe = new 
ScheduledThreadPoolExecutor(1,
             new NamedThreadFactory("RedisRegistryService-subscribe", 1));
     private ScheduledExecutorService threadPoolExecutorForUpdateMap = new 
ScheduledThreadPoolExecutor(1,
@@ -219,6 +221,7 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
 
     @Override
     public List<InetSocketAddress> lookup(String key) {
+        transactionServiceGroup = key;
         String clusterName = getServiceGroup(key);
         if (clusterName == null) {
             String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + 
PREFIX_SERVICE_MAPPING + key;
@@ -280,7 +283,7 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
         }
         socketAddresses.remove(inetSocketAddress);
 
-        removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses);
+        removeOfflineAddressesIfNecessary(transactionServiceGroup, 
notifyCluserName, socketAddresses);
     }
 
     @Override
diff --git 
a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
 
b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
index a7e4e087fa..fa0b428dda 100644
--- 
a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
@@ -81,6 +81,8 @@ public class SofaRegistryServiceImpl implements 
RegistryService<SubscriberDataOb
 
     private static volatile SofaRegistryServiceImpl instance;
 
+    private String transactionServiceGroup;
+
     private SofaRegistryServiceImpl() {
     }
 
@@ -159,6 +161,7 @@ public class SofaRegistryServiceImpl implements 
RegistryService<SubscriberDataOb
 
     @Override
     public List<InetSocketAddress> lookup(String key) throws Exception {
+        transactionServiceGroup = key;
         String clusterName = getServiceGroup(key);
         if (clusterName == null) {
             String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + 
PREFIX_SERVICE_MAPPING + key;
@@ -174,7 +177,7 @@ public class SofaRegistryServiceImpl implements 
RegistryService<SubscriberDataOb
                     List<InetSocketAddress> newAddressList = 
flatData(instances);
                     CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
 
-                    removeOfflineAddressesIfNecessary(clusterName, 
newAddressList);
+                    removeOfflineAddressesIfNecessary(transactionServiceGroup, 
clusterName, newAddressList);
                 }
                 respondRegistries.countDown();
             });
diff --git 
a/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java
 
b/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java
index 9b112b1efb..84e70abe04 100644
--- 
a/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java
+++ 
b/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java
@@ -78,6 +78,8 @@ public class ZookeeperRegisterServiceImpl implements 
RegistryService<IZkChildLis
     private static final int REGISTERED_PATH_SET_SIZE = 1;
     private static final Set<String> REGISTERED_PATH_SET = 
Collections.synchronizedSet(new HashSet<>(REGISTERED_PATH_SET_SIZE));
 
+    private String transactionServiceGroup;
+
     private ZookeeperRegisterServiceImpl() {
     }
 
@@ -175,6 +177,7 @@ public class ZookeeperRegisterServiceImpl implements 
RegistryService<IZkChildLis
      */
     @Override
     public List<InetSocketAddress> lookup(String key) throws Exception {
+        transactionServiceGroup = key;
         String clusterName = getServiceGroup(key);
 
         if (clusterName == null) {
@@ -309,7 +312,7 @@ public class ZookeeperRegisterServiceImpl implements 
RegistryService<IZkChildLis
         }
         CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
 
-        removeOfflineAddressesIfNecessary(clusterName, newAddressList);
+        removeOfflineAddressesIfNecessary(transactionServiceGroup, 
clusterName, newAddressList);
     }
 
     private String getClusterName() {
diff --git 
a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java
 
b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java
index cd7afa792f..dffd1a279a 100644
--- 
a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java
+++ 
b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java
@@ -18,9 +18,7 @@ package org.apache.seata.discovery.registry.zk;
 
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -128,18 +126,22 @@ public class ZookeeperRegisterServiceImplTest {
 
     @Test
     public void testRemoveOfflineAddressesIfNecessaryNoRemoveCase() {
-        service.CURRENT_ADDRESS_MAP.put("cluster", 
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
-        service.removeOfflineAddressesIfNecessary("cluster", 
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
+        Map<String, List<InetSocketAddress>> addresses = 
service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new 
HashMap<>());
+        addresses.put("cluster", Collections.singletonList(new 
InetSocketAddress("127.0.0.1", 8091)));
+        
service.removeOfflineAddressesIfNecessary("default_tx_group","cluster", 
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
 
-        Assertions.assertEquals(1, 
service.CURRENT_ADDRESS_MAP.get("cluster").size());
+        Assertions.assertEquals(1, 
service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster").size());
     }
 
     @Test
-    public void testRemoveOfflineAddressesIfNecessaryRemoveCase() {
-        service.CURRENT_ADDRESS_MAP.put("cluster", 
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
-        service.removeOfflineAddressesIfNecessary("cluster", 
Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091)));
+    public void testRemovePreventEmptyPushCase() {
+        Map<String, List<InetSocketAddress>> addresses = 
service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new 
HashMap<>());
 
-        Assertions.assertEquals(0, 
service.CURRENT_ADDRESS_MAP.get("cluster").size());
+        addresses.put("cluster", Collections.singletonList(new 
InetSocketAddress("127.0.0.1", 8091)));
+
+        service.removeOfflineAddressesIfNecessary("default_tx_group", 
"cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091)));
+
+        Assertions.assertEquals(1, 
service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster").size());
     }
 
     @Test
@@ -148,7 +150,8 @@ public class ZookeeperRegisterServiceImplTest {
         System.setProperty("txServiceGroup", "default_tx_group");
         System.setProperty("service.vgroupMapping.default_tx_group", 
"cluster");
 
-        service.CURRENT_ADDRESS_MAP.put("cluster", 
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
+        Map<String, List<InetSocketAddress>> addresses = 
service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new 
HashMap<>());
+        addresses.put("cluster", Collections.singletonList(new 
InetSocketAddress("127.0.0.1", 8091)));
         List<InetSocketAddress> result = 
service.aliveLookup("default_tx_group");
 
         Assertions.assertEquals(result, Collections.singletonList(new 
InetSocketAddress("127.0.0.1", 8091)));
@@ -161,12 +164,11 @@ public class ZookeeperRegisterServiceImplTest {
         System.setProperty("txServiceGroup", "default_tx_group");
         System.setProperty("service.vgroupMapping.default_tx_group", 
"cluster");
 
-        service.CURRENT_ADDRESS_MAP.put("cluster", 
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
 
         service.refreshAliveLookup("default_tx_group",
                 Collections.singletonList(new InetSocketAddress("127.0.0.2", 
8091)));
 
-        Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("cluster"),
+        
Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster"),
                 Collections.singletonList(new InetSocketAddress("127.0.0.2", 
8091)));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to