This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new fb88638398 optimize: optimize namingserver (#6755) fb88638398 is described below commit fb886383988b4f877cd9597b81c9aba7b11b3889 Author: ggbocoder <119659920+ggboco...@users.noreply.github.com> AuthorDate: Thu Aug 15 14:58:15 2024 +0800 optimize: optimize namingserver (#6755) --- .../org/apache/seata/common/ConfigurationKeys.java | 2 +- .../common/metadata/namingserver/Instance.java | 7 ++- .../namingserver/controller/NamingController.java | 4 -- .../namingserver/entity/pojo/ClusterData.java | 4 +- .../manager/ClusterWatcherManager.java | 5 +- .../seata/namingserver/manager/NamingManager.java | 31 +++++------ .../seata/namingserver/NamingControllerTest.java | 18 ++++--- .../seata/server/controller/NamingController.java | 2 + .../apache/seata/server/session/SessionHolder.java | 5 +- .../file/store/FileVGroupMappingStoreManager.java | 60 +++++++++++++++------- .../store/RedisVGroupMappingStoreManager.java | 5 +- .../server/store/VGroupMappingStoreManager.java | 6 ++- server/src/main/resources/application.example.yml | 1 + 13 files changed, 93 insertions(+), 57 deletions(-) diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index f88a3cd2bf..26017ff753 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -1035,7 +1035,7 @@ public interface ConfigurationKeys { /** * The constant MAPPING_TABLE_NAME */ - String MAPPING_TABLE_NAME = STORE_DB_PREFIX + FILE_CONFIG_SPLIT_CHAR + "mapping-table"; + String MAPPING_TABLE_NAME = STORE_DB_PREFIX + "mapping-table"; /** * The constant NAMESPACE_KEY diff --git a/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java b/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java index d08cd622c5..4692692c8c 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java +++ b/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java @@ -17,7 +17,6 @@ package org.apache.seata.common.metadata.namingserver; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seata.common.metadata.ClusterRole; @@ -133,6 +132,10 @@ public class Instance { this.timestamp = timestamp; } + public Map<String, Object> getMetadata() { + return metadata; + } + public void addMetadata(String key, Object value) { this.metadata.put(key, value); } @@ -182,7 +185,7 @@ public class Instance { resultMap.put("weight", String.valueOf(weight)); resultMap.put("healthy", String.valueOf(healthy)); resultMap.put("term", String.valueOf(term)); - resultMap.put("timestamp",String.valueOf(timestamp)); + resultMap.put("timestamp", String.valueOf(timestamp)); resultMap.put("metadata", mapToJsonString(metadata)); return resultMap; diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java index 239aec0f21..e94af26e0a 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java @@ -141,9 +141,5 @@ public class NamingController { .collect(Collectors.toList()); } - @GetMapping("/health") - public String healthCheck() { - return "ok"; - } } diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java index 32dd9f7aa2..ead20f4abf 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java @@ -139,11 +139,11 @@ public class ClusterData { // ensure that when adding an instance, the remove side will not delete the unit. lock.lock(); try { - currentUnit.addInstance(instance); + return currentUnit.addInstance(instance); } finally { lock.unlock(); } - return true; + } diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java index f3b04a473a..6c2baaf50c 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java @@ -22,6 +22,7 @@ import org.apache.seata.namingserver.listener.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.event.EventListener; +import org.springframework.http.HttpStatus; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Component; @@ -62,7 +63,7 @@ public class ClusterWatcherManager implements ClusterChangeListener { Optional.ofNullable(WATCHERS.remove(group)) .ifPresent(watchers -> watchers.parallelStream().forEach(watcher -> { if (System.currentTimeMillis() >= watcher.getTimeout()) { - notify(watcher, 304); + notify(watcher, HttpStatus.NOT_MODIFIED.value()); } if (!watcher.isDone()) { // Re-register @@ -77,7 +78,7 @@ public class ClusterWatcherManager implements ClusterChangeListener { @EventListener @Async public void onChangeEvent(ClusterChangeEvent event) { - if (event.getTerm() > 0) { + if (event.getTerm() > 0 || event.getTerm() == -1) { GROUP_UPDATE_TIME.put(event.getGroup(), event.getTerm()); // Notifications are made of changes in cluster information diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java index 39378f2397..72889b8d13 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java @@ -27,6 +27,7 @@ import org.apache.seata.common.metadata.namingserver.Unit; import org.apache.seata.common.result.Result; import org.apache.seata.common.util.HttpClientUtil; import org.apache.seata.common.NamingServerConstants; +import org.apache.seata.common.util.StringUtils; import org.apache.seata.namingserver.listener.ClusterChangeEvent; import org.apache.seata.namingserver.entity.pojo.ClusterData; import org.apache.seata.namingserver.entity.vo.monitor.ClusterVO; @@ -202,20 +203,18 @@ public class NamingManager { } } - public void notifyClusterChange(String namespace, String clusterName, String unitName) { + public void notifyClusterChange(String namespace, String clusterName, String unitName,long term) { for (Map.Entry<String, ConcurrentMap<String, Pair<String, String>>> entry : vGroupMap.entrySet()) { String vGroup = entry.getKey(); Map<String, Pair<String, String>> namespaceMap = entry.getValue(); // Iterating through an internal HashMap for (Map.Entry<String, Pair<String, String>> innerEntry : namespaceMap.entrySet()) { - String namespace1 = innerEntry.getKey(); + Pair<String, String> pair = innerEntry.getValue(); String clusterName1 = pair.getKey(); - String unitName1 = pair.getValue(); - if (namespace1.equals(namespace) && clusterName1.equals(clusterName) - && (unitName1 == null || unitName1.equals(unitName))) { - applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, System.currentTimeMillis())); + if (StringUtils.equals(clusterName1,clusterName)) { + applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, term)); } } } @@ -234,17 +233,19 @@ public class NamingManager { // if extended metadata includes vgroup mapping relationship, add it in clusterData Optional.ofNullable(node.getMetadata().remove(CONSTANT_GROUP)).ifPresent(mappingObj -> { - if (mappingObj instanceof List) { - List<String> vGroups = (List<String>)mappingObj; - for (String vGroup : vGroups) { - changeGroup(namespace, clusterName, unitName, vGroup); - } + if (mappingObj instanceof Map) { + Map<String, Object> vGroups = (Map) mappingObj; + vGroups.forEach((k, v) -> { + // In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node. + // In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used. + changeGroup(namespace, clusterName, v == null ? unitName : (String)v, k); + }); } }); boolean hasChanged = clusterData.registerInstance(node, unitName); if (hasChanged) { - notifyClusterChange(namespace, clusterName, unitName); + notifyClusterChange(namespace, clusterName, unitName,node.getTerm()); } instanceLiveTable.put( new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()), @@ -256,7 +257,7 @@ public class NamingManager { return true; } - public boolean unregisterInstance(String unitName, Node node) { + public boolean unregisterInstance(String unitName, NamingServerNode node) { try { for (String namespace : namespaceClusterDataMap.keySet()) { Map<String, ClusterData> clusterMap = namespaceClusterDataMap.get(namespace); @@ -264,7 +265,7 @@ public class NamingManager { clusterMap.forEach((clusterName, clusterData) -> { if (clusterData.getUnitData() != null && clusterData.getUnitData().containsKey(unitName)) { clusterData.removeInstance(node, unitName); - notifyClusterChange(namespace, clusterName, unitName); + notifyClusterChange(namespace, clusterName, unitName, node.getTerm()); instanceLiveTable.remove(new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort())); } @@ -329,7 +330,7 @@ public class NamingManager { LOGGER.warn("{} instance has gone offline", instance.getTransaction().getHost() + ":" + instance.getTransaction().getPort()); } - notifyClusterChange(namespace, clusterData.getClusterName(), unit.getUnitName()); + notifyClusterChange(namespace, clusterData.getClusterName(), unit.getUnitName(),-1); } } } diff --git a/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java b/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java index 6ea5b9552e..200019da36 100644 --- a/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java +++ b/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java @@ -30,6 +30,7 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -61,8 +62,8 @@ class NamingControllerTest { node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); Map<String, Object> meatadata = node.getMetadata(); - List<String> vGroups = new ArrayList<>(); - vGroups.add("vgroup1"); + Map<String,Object> vGroups = new HashMap<>(); + vGroups.put("vgroup1",null); meatadata.put(CONSTANT_GROUP, vGroups); namingController.registerInstance(namespace, clusterName, unitName, node); String vGroup = "vgroup1"; @@ -92,16 +93,16 @@ class NamingControllerTest { node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); Map<String, Object> meatadata = node.getMetadata(); - List<String> vGroups = new ArrayList<>(); - vGroups.add("vgroup1"); + Map<String,Object> vGroups = new HashMap<>(); + vGroups.put("vgroup1",null); meatadata.put(CONSTANT_GROUP, vGroups); namingController.registerInstance(namespace, clusterName, unitName, node); NamingServerNode node2 = new NamingServerNode(); node2.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); node2.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); Map<String, Object> meatadata2 = node2.getMetadata(); - List<String> vGroups2 = new ArrayList<>(); - vGroups2.add("vgroup2"); + Map<String,Object> vGroups2 = new HashMap<>(); + vGroups2.put("vgroup2",null); meatadata2.put(CONSTANT_GROUP, vGroups2); namingController.registerInstance(namespace, "cluster2", UUID.randomUUID().toString(), node2); String vGroup = "vgroup1"; @@ -136,8 +137,8 @@ class NamingControllerTest { node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); Map<String, Object> meatadata = node.getMetadata(); - List<String> vGroups = new ArrayList<>(); - vGroups.add("vgroup1"); + Map<String,Object> vGroups = new HashMap<>(); + vGroups.put("vgroup1",null); meatadata.put(CONSTANT_GROUP, vGroups); namingController.registerInstance(namespace, clusterName, unitName, node); String vGroup = "vgroup1"; @@ -164,4 +165,5 @@ class NamingControllerTest { cluster = metaResponse.getClusterList().get(0); assertEquals(0, cluster.getUnitData().size()); } + } \ No newline at end of file diff --git a/server/src/main/java/org/apache/seata/server/controller/NamingController.java b/server/src/main/java/org/apache/seata/server/controller/NamingController.java index cc521ca9f3..cd7a63074b 100644 --- a/server/src/main/java/org/apache/seata/server/controller/NamingController.java +++ b/server/src/main/java/org/apache/seata/server/controller/NamingController.java @@ -67,6 +67,7 @@ public class NamingController { mappingDO.setUnit(unit); mappingDO.setVGroup(vGroup); boolean rst = vGroupMappingStoreManager.addVGroup(mappingDO); + Instance.getInstance().setTerm(System.currentTimeMillis()); if (!rst) { result.setCode("500"); result.setMessage("add vGroup failed!"); @@ -86,6 +87,7 @@ public class NamingController { public Result<?> removeVGroup(@RequestParam String vGroup) { Result<?> result = new Result<>(); boolean rst = vGroupMappingStoreManager.removeVGroup(vGroup); + Instance.getInstance().setTerm(System.currentTimeMillis()); if (!rst) { result.setCode("500"); result.setMessage("remove vGroup failed!"); diff --git a/server/src/main/java/org/apache/seata/server/session/SessionHolder.java b/server/src/main/java/org/apache/seata/server/session/SessionHolder.java index 381be450f9..30a9a31412 100644 --- a/server/src/main/java/org/apache/seata/server/session/SessionHolder.java +++ b/server/src/main/java/org/apache/seata/server/session/SessionHolder.java @@ -78,7 +78,7 @@ public class SessionHolder { /** * The default vgroup mapping store dir */ - public static final String DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR = System.getProperty("user.dir"); + public static final String DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR = "vgroupStore"; private static VGroupMappingStoreManager ROOT_VGROUP_MAPPING_MANAGER; @@ -122,7 +122,8 @@ public class SessionHolder { RaftServerManager.start(); } else { String vGroupMappingStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR, - DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR); + DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR) + separator + + System.getProperty(SERVER_SERVICE_PORT_CAMEL); String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR, DEFAULT_SESSION_STORE_FILE_DIR) + separator + System.getProperty(SERVER_SERVICE_PORT_CAMEL); diff --git a/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java index 4b4fd26e0d..6a9a6739ca 100644 --- a/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java @@ -17,7 +17,6 @@ package org.apache.seata.server.storage.file.store; - import org.apache.seata.common.loader.LoadLevel; import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; @@ -32,7 +31,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.HashMap; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; @LoadLevel(name = "file") public class FileVGroupMappingStoreManager implements VGroupMappingStoreManager { @@ -40,10 +41,13 @@ public class FileVGroupMappingStoreManager implements VGroupMappingStoreManager public static final String ROOT_MAPPING_MANAGER_NAME = "vgroup_mapping.json"; - private final ReentrantLock writeLock = new ReentrantLock(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private String storePath; + HashMap<String, Object> vGroupMapping = new HashMap<>(); + + protected static final Configuration CONFIG = ConfigurationFactory.getInstance(); @@ -56,29 +60,50 @@ public class FileVGroupMappingStoreManager implements VGroupMappingStoreManager @Override public boolean addVGroup(MappingDO mappingDO) { - HashMap<String, Object> vGroupMapping = loadVGroups(); - vGroupMapping.put(mappingDO.getVGroup(), mappingDO.getUnit()); - boolean isSaved = save(vGroupMapping); - if (!isSaved) { - LOGGER.error("add mapping relationship failed!"); + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + vGroupMapping.put(mappingDO.getVGroup(), mappingDO.getUnit()); + boolean isSaved = save(vGroupMapping); + + if (!isSaved) { + LOGGER.error("add mapping relationship failed!"); + } + return isSaved; + } finally { + writeLock.unlock(); } - return isSaved; } @Override public boolean removeVGroup(String vGroup) { - HashMap<String, Object> vGroupMapping = loadVGroups(); - vGroupMapping.remove(vGroup); - boolean isSaved = save(vGroupMapping); - if (!isSaved) { - LOGGER.error("remove mapping relationship failed!"); + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + vGroupMapping.remove(vGroup); + boolean isSaved = save(vGroupMapping); + if (!isSaved) { + LOGGER.error("remove mapping relationship failed!"); + } + return isSaved; + } finally { + writeLock.unlock(); + } + } + + @Override + public HashMap<String, Object> readVGroups() { + Lock readLock = lock.readLock(); + readLock.lock(); + try { + return vGroupMapping; + } finally { + readLock.unlock(); } - return isSaved; } @Override public HashMap<String, Object> loadVGroups() { - HashMap<String, Object> vGroupMapping = new HashMap<>(); try { File fileToLoad = new File(storePath); if (!fileToLoad.exists()) { @@ -112,7 +137,6 @@ public class FileVGroupMappingStoreManager implements VGroupMappingStoreManager public boolean save(HashMap<String, Object> vGroupMapping) { - writeLock.lock(); try { ObjectMapper objectMapper = new ObjectMapper(); String jsonMapping = objectMapper.writeValueAsString(vGroupMapping); @@ -121,8 +145,6 @@ public class FileVGroupMappingStoreManager implements VGroupMappingStoreManager } catch (IOException e) { LOGGER.error("mapping relationship saved failed! ", e); return false; - } finally { - writeLock.unlock(); } } } diff --git a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java index 032b46b4b8..7b4129cb5d 100644 --- a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java @@ -19,6 +19,7 @@ package org.apache.seata.server.storage.redis.store; import org.apache.seata.common.exception.RedisException; import org.apache.seata.common.loader.LoadLevel; import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.util.StringUtils; import org.apache.seata.core.store.MappingDO; import org.apache.seata.server.storage.redis.JedisPooledFactory; import org.apache.seata.server.store.VGroupMappingStoreManager; @@ -66,7 +67,9 @@ public class RedisVGroupMappingStoreManager implements VGroupMappingStoreManager Map<String, String> mappingKeyMap = jedis.hgetAll(namespace); HashMap<String, Object> result = new HashMap<>(); for (Map.Entry<String, String> entry : mappingKeyMap.entrySet()) { - result.put(entry.getKey(), null); + if (StringUtils.equals(clusterName, entry.getValue())) { + result.put(entry.getKey(), null); + } } return result; } catch (Exception ex) { diff --git a/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java index 6f42f9822b..cf45795a7b 100644 --- a/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java @@ -47,13 +47,17 @@ public interface VGroupMappingStoreManager { */ HashMap<String, Object> loadVGroups(); + default HashMap<String, Object> readVGroups() { + return loadVGroups(); + } + /** * notify mapping relationship to all namingserver nodes */ default void notifyMapping() { Instance instance = Instance.getInstance(); - instance.addMetadata("vGroup", this.loadVGroups()); + instance.addMetadata("vGroup", this.readVGroups()); try { InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort()); for (RegistryService registryService : MultiRegistryFactory.getInstances()) { diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml index bcbb5d4bd0..86f0625a58 100644 --- a/server/src/main/resources/application.example.yml +++ b/server/src/main/resources/application.example.yml @@ -134,6 +134,7 @@ seata: group: SEATA_GROUP address-wait-time: 3000 + server: service-port: 8091 #If not configured, the default is '${server.port} + 1000' max-commit-retry-timeout: -1 --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org