This is an automated email from the ASF dual-hosted git repository.
xingfudeshi pushed a commit to branch gsoc-2025-meta-registry
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/gsoc-2025-meta-registry by
this push:
new c3cc50afb6 optimize: metadata discovery support for nacos (#7746)
c3cc50afb6 is described below
commit c3cc50afb6c1063d115db00c5c5c75a96cecc477
Author: YoWuwuuuw <[email protected]>
AuthorDate: Wed Dec 10 14:22:52 2025 +0800
optimize: metadata discovery support for nacos (#7746)
---
.../seata/common/metadata/ServiceInstance.java | 17 ++
.../common/loader/EnhancedServiceLoaderTest.java | 2 +-
.../registry/nacos/NacosRegistryServiceImpl.java | 26 ++-
.../nacos/NacosRegistryServiceImplTest.java | 238 +++++++++++++++++++--
.../test/resources/{registry.conf => file.conf} | 36 +---
.../src/test/resources/registry.conf | 3 +-
6 files changed, 264 insertions(+), 58 deletions(-)
diff --git
a/common/src/main/java/org/apache/seata/common/metadata/ServiceInstance.java
b/common/src/main/java/org/apache/seata/common/metadata/ServiceInstance.java
index ef9e1a7421..67d8cc7341 100644
--- a/common/src/main/java/org/apache/seata/common/metadata/ServiceInstance.java
+++ b/common/src/main/java/org/apache/seata/common/metadata/ServiceInstance.java
@@ -97,6 +97,23 @@ public class ServiceInstance {
return serviceInstances;
}
+ /**
+ * Converts a Map<String, Object> to Map<String, String>.
+ * @param metadata the original metadata
+ * @return converted Map<String, String>
+ */
+ public static Map<String, String> toStringMap(Map<String, Object>
metadata) {
+ Map<String, String> stringMap = new HashMap<>();
+ if (metadata != null) {
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ stringMap.put(
+ entry.getKey(),
+ entry.getValue() == null ? null :
entry.getValue().toString());
+ }
+ }
+ return stringMap;
+ }
+
/**
* Creates a ServiceInstance from an InetSocketAddress and a Map<String,
String> of metadata.
* @param address the InetSocketAddress
diff --git
a/common/src/test/java/org/apache/seata/common/loader/EnhancedServiceLoaderTest.java
b/common/src/test/java/org/apache/seata/common/loader/EnhancedServiceLoaderTest.java
index 7997804874..465e0b2016 100644
---
a/common/src/test/java/org/apache/seata/common/loader/EnhancedServiceLoaderTest.java
+++
b/common/src/test/java/org/apache/seata/common/loader/EnhancedServiceLoaderTest.java
@@ -179,7 +179,7 @@ public class EnhancedServiceLoaderTest {
}
// FIXME: 2023/2/11 wait fix EnhancedServiceLoader.unload(Class<S>
service, String activateName)
- // @Test
+ @Test
public void testUnloadByClassAndActivateName() throws
NoSuchFieldException, IllegalAccessException {
Hello englishHello = EnhancedServiceLoader.load(Hello.class,
"EnglishHello");
assertThat(englishHello.say()).isEqualTo("hello!");
diff --git
a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
index d0a1dc33de..39644d65e3 100644
---
a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
+++
b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
@@ -117,26 +117,14 @@ public class NacosRegistryServiceImpl implements
RegistryService<EventListener>
public void register(ServiceInstance instance) throws Exception {
InetSocketAddress address = instance.getAddress();
NetUtil.validAddress(address);
- getNamingInstance()
- .registerInstance(
- getServiceName(),
- getServiceGroup(),
- address.getAddress().getHostAddress(),
- address.getPort(),
- getClusterName());
+ getNamingInstance().registerInstance(getServiceName(),
getServiceGroup(), getNacosInstance(instance));
}
@Override
public void unregister(ServiceInstance instance) throws Exception {
InetSocketAddress address = instance.getAddress();
NetUtil.validAddress(address);
- getNamingInstance()
- .deregisterInstance(
- getServiceName(),
- getServiceGroup(),
- address.getAddress().getHostAddress(),
- address.getPort(),
- getClusterName());
+ getNamingInstance().deregisterInstance(getServiceName(),
getServiceGroup(), getNacosInstance(instance));
}
@Override
@@ -252,6 +240,16 @@ public class NacosRegistryServiceImpl implements
RegistryService<EventListener>
}
}
+ private Instance getNacosInstance(ServiceInstance instance) {
+ Instance nacosInstance = new Instance();
+ nacosInstance.setClusterName(getClusterName());
+ InetSocketAddress address = instance.getAddress();
+ nacosInstance.setIp(address.getAddress().getHostAddress());
+ nacosInstance.setPort(address.getPort());
+
nacosInstance.setMetadata(ServiceInstance.toStringMap(instance.getMetadata()));
+ return nacosInstance;
+ }
+
/**
* Gets naming instance.
*
diff --git
a/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
b/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
index 344942a35d..7908248c4e 100644
---
a/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
+++
b/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
@@ -16,32 +16,233 @@
*/
package org.apache.seata.discovery.registry.nacos;
-import org.apache.seata.common.util.ReflectionUtil;
-import org.assertj.core.api.Assertions;
+import com.alibaba.nacos.api.naming.listener.Event;
+import com.alibaba.nacos.api.naming.listener.EventListener;
+import com.alibaba.nacos.api.naming.listener.NamingEvent;
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import org.apache.seata.common.metadata.ServiceInstance;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Properties;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-/**
- * The type Nacos registry serivce impl test
- */
+@EnabledIfSystemProperty(named = "nacosCaseEnabled", matches = "true")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class NacosRegistryServiceImplTest {
+ private static final String SERVICE_NAME = "default_tx_group";
+ private static final String CLUSTER_NAME = "default";
+
+ private static final NacosRegistryServiceImpl service =
NacosRegistryServiceImpl.getInstance();
+
+ @Test
+ public void testGetInstance() {
+ NacosRegistryServiceImpl instance =
NacosRegistryServiceImpl.getInstance();
+ assertInstanceOf(NacosRegistryServiceImpl.class, instance);
+ }
+
+ @Test
+ @Order(1)
+ public void testRegister() throws Exception {
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8091);
+ Map<String, Object> metadata = new HashMap<>();
+ metadata.put("version", "1.0.0");
+ metadata.put("weight", 1.0);
+ metadata.put("healthy", true);
+
+ ServiceInstance serviceInstance = new ServiceInstance(address,
metadata);
+
+ // Verify ServiceInstance metadata
+ assertNotNull(serviceInstance.getMetadata());
+ assertEquals("1.0.0", serviceInstance.getMetadata().get("version"));
+ assertEquals(1.0, serviceInstance.getMetadata().get("weight"));
+ assertEquals(true, serviceInstance.getMetadata().get("healthy"));
+
+ service.register(serviceInstance);
+
+ // Verify registration success
+ long startTime = System.currentTimeMillis();
+ while (service.lookup(SERVICE_NAME).isEmpty() &&
System.currentTimeMillis() - startTime < 10000) {
+ Thread.sleep(100);
+ }
+
+ List<ServiceInstance> instances = service.lookup(SERVICE_NAME);
+ assertFalse(instances.isEmpty());
+
+ // Cleanup
+ service.unregister(serviceInstance);
+ }
+
+ @Test
+ public void testRegisterWithInvalidAddress() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ InetSocketAddress invalidAddress = new
InetSocketAddress("127.0.0.1", 0);
+ ServiceInstance invalidInstance = new
ServiceInstance(invalidAddress, new HashMap<>());
+ service.register(invalidInstance);
+ });
+ }
+
+ @Test
+ @Order(2)
+ public void testUnregister() throws Exception {
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8092);
+ Map<String, Object> metadata = new HashMap<>();
+ metadata.put("version", "1.0.0");
+ metadata.put("weight", 1.0);
+
+ ServiceInstance serviceInstance = new ServiceInstance(address,
metadata);
+ InetSocketAddress address1 = new InetSocketAddress("127.0.0.1", 8093);
+
+ ServiceInstance serviceInstance1 = new ServiceInstance(address1,
metadata);
+
+ // Verify ServiceInstance metadata
+ assertNotNull(serviceInstance.getMetadata());
+ assertEquals("1.0.0", serviceInstance.getMetadata().get("version"));
+ assertEquals(1.0, serviceInstance.getMetadata().get("weight"));
+
+ service.register(serviceInstance);
+ service.register(serviceInstance1);
+
+ long startTime = System.currentTimeMillis();
+ while (service.lookup(SERVICE_NAME).isEmpty() &&
System.currentTimeMillis() - startTime < 10000) {
+ Thread.sleep(100);
+ }
+
+ List<ServiceInstance> instancesBefore = service.lookup(SERVICE_NAME);
+ assertFalse(instancesBefore.isEmpty());
+
+ service.unregister(serviceInstance);
+
+ startTime = System.currentTimeMillis();
+ while (!service.lookup(SERVICE_NAME).isEmpty() &&
System.currentTimeMillis() - startTime < 10000) {
+ Thread.sleep(100);
+ }
+
+ // Verify unregistration success
+ List<ServiceInstance> instancesAfter = service.lookup(SERVICE_NAME);
+ assertEquals(1, instancesAfter.size());
+ }
+
+ @Test
+ @Order(3)
+ public void testSubscribe() throws Exception {
+ // First, clean up any existing instances to ensure a clean state
+
+ List<ServiceInstance> existingInstances = service.lookup(SERVICE_NAME);
+ for (ServiceInstance instance : existingInstances) {
+ service.unregister(instance);
+ }
+ Thread.sleep(1000);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ final boolean[] eventReceived = {false};
+
+ // Create test listener
+ EventListener listener = new EventListener() {
+ @Override
+ public void onEvent(Event event) {
+ if (event instanceof NamingEvent) {
+ NamingEvent namingEvent = (NamingEvent) event;
+ List<Instance> instances = namingEvent.getInstances();
+ if (instances != null && !instances.isEmpty()) {
+ // Verify instance metadata
+ Instance instance = instances.get(0);
+ Map<String, String> metadata = instance.getMetadata();
+ assertNotNull(metadata);
+ assertEquals("1.0.0", metadata.get("version"));
+ assertEquals("1.0", metadata.get("weight"));
+ assertEquals("true", metadata.get("healthy"));
+ eventReceived[0] = true;
+ }
+ }
+ latch.countDown();
+ }
+ };
+
+ // Execute subscription
+ service.subscribe(CLUSTER_NAME, listener);
+
+ // Wait a bit for subscription to be established
+ Thread.sleep(1000);
+
+ // Register a service instance with metadata to trigger event
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8094);
+ Map<String, Object> metadata = new HashMap<>();
+ metadata.put("version", "1.0.0");
+ metadata.put("weight", 1.0);
+ metadata.put("healthy", true);
+
+ ServiceInstance serviceInstance = new ServiceInstance(address,
metadata);
+ service.register(serviceInstance);
+
+ // Wait for event trigger
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertTrue(eventReceived[0]);
+
+ // Cleanup
+ service.unregister(serviceInstance);
+ service.unsubscribe(CLUSTER_NAME, listener);
+ }
+
@Test
- public void testGetConfigProperties() throws Exception {
- Method method =
ReflectionUtil.getMethod(NacosRegistryServiceImpl.class, "getNamingProperties");
- Properties properties = (Properties) ReflectionUtil.invokeMethod(null,
method);
-
Assertions.assertThat(properties.getProperty("contextPath")).isEqualTo("/foo");
- System.setProperty("contextPath", "/bar");
- properties = (Properties) ReflectionUtil.invokeMethod(null, method);
-
Assertions.assertThat(properties.getProperty("contextPath")).isEqualTo("/bar");
+ @Order(4)
+ public void testUnsubscribe() throws Exception {
+ EventListener listener = new EventListener() {
+ @Override
+ public void onEvent(Event event) {}
+ };
+
+ // Subscribe first
+ service.subscribe(CLUSTER_NAME, listener);
+
+ // Verify listener is added to LISTENER_SERVICE_MAP
+ ConcurrentMap<String, List<EventListener>> listenersMap =
getListenersMap();
+ boolean found = false;
+ for (List<EventListener> listeners : listenersMap.values()) {
+ if (listeners.contains(listener)) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue(found);
+
+ // Unsubscribe
+ service.unsubscribe(CLUSTER_NAME, listener);
+
+ // Verify listener is removed from LISTENER_SERVICE_MAP
+ found = false;
+ for (List<EventListener> listeners : listenersMap.values()) {
+ if (listeners.contains(listener)) {
+ found = true;
+ break;
+ }
+ }
+ assertFalse(found);
+
+ // Verify listener is not null
+ assertNotNull(listener);
}
@Test
+ @Order(5)
public void testClose() throws Exception {
NacosRegistryServiceImpl instance =
NacosRegistryServiceImpl.getInstance();
NacosRegistryServiceImpl.getNamingInstance();
@@ -61,4 +262,13 @@ public class NacosRegistryServiceImplTest {
namingMaintainField.setAccessible(true);
assertNull(namingMaintainField.get(null));
}
+
+ private ConcurrentMap<String, List<EventListener>> getListenersMap()
throws Exception {
+ Class<?> clazz = NacosRegistryServiceImpl.class;
+
+ Field listenerServiceMapField =
clazz.getDeclaredField("LISTENER_SERVICE_MAP");
+ listenerServiceMapField.setAccessible(true);
+
+ return (ConcurrentMap<String, List<EventListener>>)
listenerServiceMapField.get(null);
+ }
}
diff --git a/discovery/seata-discovery-nacos/src/test/resources/registry.conf
b/discovery/seata-discovery-nacos/src/test/resources/file.conf
similarity index 53%
copy from discovery/seata-discovery-nacos/src/test/resources/registry.conf
copy to discovery/seata-discovery-nacos/src/test/resources/file.conf
index 1fd13bc940..84f2276c66 100644
--- a/discovery/seata-discovery-nacos/src/test/resources/registry.conf
+++ b/discovery/seata-discovery-nacos/src/test/resources/file.conf
@@ -15,31 +15,11 @@
# limitations under the License.
#
-registry {
- # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom
- type = "nacos"
-
- nacos {
- application = "seata-server"
- serverAddr = "127.0.0.1:8848"
- group = "SEATA_GROUP"
- namespace = ""
- username = ""
- password = ""
- contextPath = "/foo"
- ##if use MSE Nacos with auth, mutex with username/password attribute
- #accessKey = ""
- #secretKey = ""
- ##if use Nacos naming meta-data for SLB service registry, specify nacos
address pattern rules here
- #slbPattern = ""
- }
-}
-
-config {
- # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig、custom
- type = "file"
-
- file {
- name = "file.conf"
- }
-}
+service {
+ # transaction service group mapping
+ vgroupMapping.default_tx_group = "default"
+ # only support when registry.type=file, please don't set multiple addresses
+ default.grouplist = "127.0.0.1:8080"
+ # disable seata
+ disableGlobalTransaction = false
+}
\ No newline at end of file
diff --git a/discovery/seata-discovery-nacos/src/test/resources/registry.conf
b/discovery/seata-discovery-nacos/src/test/resources/registry.conf
index 1fd13bc940..48b5551716 100644
--- a/discovery/seata-discovery-nacos/src/test/resources/registry.conf
+++ b/discovery/seata-discovery-nacos/src/test/resources/registry.conf
@@ -23,10 +23,11 @@ registry {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
+ cluster = "default"
namespace = ""
username = ""
password = ""
- contextPath = "/foo"
+ contextPath = ""
##if use MSE Nacos with auth, mutex with username/password attribute
#accessKey = ""
#secretKey = ""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]