This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new dc08160e2d feature: Raft cluster mode supports address translation
(#7069)
dc08160e2d is described below
commit dc08160e2d4ed1ae6f04f71f8dc079ea3acf4a67
Author: PeppaO <[email protected]>
AuthorDate: Mon Dec 30 16:59:52 2024 +0800
feature: Raft cluster mode supports address translation (#7069)
---
changes/en-us/2.x.md | 4 +-
changes/zh-cn/2.x.md | 3 +-
.../org/apache/seata/common/ConfigurationKeys.java | 11 ++
.../common/exception/ParseEndpointException.java | 38 +++++++
.../org/apache/seata/common/metadata/Node.java | 86 ++++++++++++++-
.../java/org/apache/seata/common/util/NetUtil.java | 16 +++
.../registry/raft/RaftRegistryServiceImpl.java | 117 +++++++++++++++++++--
.../registry/raft/RaftRegistryServiceImplTest.java | 34 +++++-
script/client/conf/registry.conf | 4 +-
script/client/spring/application.properties | 4 +
script/client/spring/application.yml | 3 +
.../SeataCoreEnvironmentPostProcessor.java | 3 +
.../boot/autoconfigure/StarterConstants.java | 2 +
.../registry/RegistryMetadataProperties.java | 36 +++++++
.../raft/sync/msg/dto/RaftClusterMetadata.java | 15 ++-
15 files changed, 356 insertions(+), 20 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 73676753a7..1ff5b89222 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -4,7 +4,8 @@ Add changes here for all PR submitted to the 2.x branch.
### feature:
-- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] support XXX
+- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster
mode supports address translation
+
### bugfix:
@@ -35,5 +36,6 @@ Thanks to these contributors for their code commits. Please
report an unintended
- [slievrly](https://github.com/slievrly)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [remind](https://github.com/remind)
+- [PeppaO](https://github.com/PeppaO)
Also, we receive many valuable issues, questions and advices from our
community. Thanks for you all.
\ No newline at end of file
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 7e3d9a5938..f38d504d3f 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -4,7 +4,7 @@
### feature:
-- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 支持XXX
+- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换
### bugfix:
@@ -35,5 +35,6 @@
- [slievrly](https://github.com/slievrly)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [remind](https://github.com/remind)
+- [PeppaO](https://github.com/PeppaO)
同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
\ No newline at end of file
diff --git
a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
index 840e029800..3bb4c9873f 100644
--- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
+++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
@@ -1110,4 +1110,15 @@ public interface ConfigurationKeys {
* The constant META_PREFIX
*/
String META_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR +
FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + "metadata.";
+
+ /**
+ * The constant SERVER_REGISTRY_METADATA_PREFIX
+ */
+ String SERVER_REGISTRY_METADATA_PREFIX = SERVER_PREFIX +
FILE_ROOT_REGISTRY + ".metadata";
+
+ /**
+ * The constant SERVER_REGISTRY_METADATA_EXTERNAL
+ */
+ String SERVER_REGISTRY_METADATA_EXTERNAL = SERVER_REGISTRY_METADATA_PREFIX
+ ".external";
+
}
diff --git
a/common/src/main/java/org/apache/seata/common/exception/ParseEndpointException.java
b/common/src/main/java/org/apache/seata/common/exception/ParseEndpointException.java
new file mode 100644
index 0000000000..e4f550d548
--- /dev/null
+++
b/common/src/main/java/org/apache/seata/common/exception/ParseEndpointException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.exception;
+
+public class ParseEndpointException extends RuntimeException {
+ public ParseEndpointException() {
+ }
+
+ public ParseEndpointException(String message) {
+ super(message);
+ }
+
+ public ParseEndpointException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ParseEndpointException(Throwable cause) {
+ super(cause);
+ }
+
+ public ParseEndpointException(String message, Throwable cause, boolean
enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/common/src/main/java/org/apache/seata/common/metadata/Node.java
b/common/src/main/java/org/apache/seata/common/metadata/Node.java
index 92d43c366f..bcc85a9696 100644
--- a/common/src/main/java/org/apache/seata/common/metadata/Node.java
+++ b/common/src/main/java/org/apache/seata/common/metadata/Node.java
@@ -18,11 +18,12 @@ package org.apache.seata.common.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-
+import org.apache.seata.common.exception.ParseEndpointException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-
+import java.util.List;
+import java.util.ArrayList;
public class Node {
@@ -195,4 +196,85 @@ public class Node {
}
}
+ private Node.ExternalEndpoint createExternalEndpoint(String host, int
controllerPort, int transactionPort) {
+ return new Node.ExternalEndpoint(host, controllerPort,
transactionPort);
+ }
+
+ public List<ExternalEndpoint> createExternalEndpoints(String external) {
+ List<Node.ExternalEndpoint> externalEndpoints = new ArrayList<>();
+ String[] split = external.split(",");
+
+ for (String s : split) {
+ String[] item = s.split(":");
+ if (item.length == 3) {
+ try {
+ String host = item[0];
+ int controllerPort = Integer.parseInt(item[1]);
+ int transactionPort = Integer.parseInt(item[2]);
+ externalEndpoints.add(createExternalEndpoint(host,
controllerPort, transactionPort));
+ } catch (NumberFormatException e) {
+ throw new ParseEndpointException("Invalid port number in:
" + s);
+ }
+ } else {
+ throw new ParseEndpointException("Invalid format for endpoint:
" + s);
+ }
+ }
+ return externalEndpoints;
+ }
+
+ public Map<String, Object> updateMetadataWithExternalEndpoints(Map<String,
Object> metadata, List<Node.ExternalEndpoint> externalEndpoints) {
+ Object obj = metadata.get("external");
+ if (obj == null) {
+ if (!externalEndpoints.isEmpty()) {
+ Map<String, Object> metadataMap = new HashMap<>(metadata);
+ metadataMap.put("external", externalEndpoints);
+ return metadataMap;
+ }
+ return metadata;
+ }
+ if (obj instanceof List) {
+ List<Node.ExternalEndpoint> oldList =
(List<Node.ExternalEndpoint>) obj;
+ oldList.addAll(externalEndpoints);
+ return metadata;
+ } else {
+ throw new ParseEndpointException("Metadata 'external' is not a
List.");
+ }
+ }
+
+ public static class ExternalEndpoint {
+
+ private String host;
+ private int controlPort;
+ private int transactionPort;
+
+ public ExternalEndpoint(String host, int controlPort, int
transactionPort) {
+ this.host = host;
+ this.controlPort = controlPort;
+ this.transactionPort = transactionPort;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getControlPort() {
+ return controlPort;
+ }
+
+ public void setControlPort(int controlPort) {
+ this.controlPort = controlPort;
+ }
+
+ public int getTransactionPort() {
+ return transactionPort;
+ }
+
+ public void setTransactionPort(int transactionPort) {
+ this.transactionPort = transactionPort;
+ }
+ }
}
diff --git a/common/src/main/java/org/apache/seata/common/util/NetUtil.java
b/common/src/main/java/org/apache/seata/common/util/NetUtil.java
index 008f0839be..2043f85c79 100644
--- a/common/src/main/java/org/apache/seata/common/util/NetUtil.java
+++ b/common/src/main/java/org/apache/seata/common/util/NetUtil.java
@@ -90,9 +90,25 @@ public class NetUtil {
* @return the string
*/
public static String toStringAddress(InetSocketAddress address) {
+ if (address.getAddress() == null) {
+ return address.getHostString() + ":" + address.getPort();
+ }
return address.getAddress().getHostAddress() + ":" + address.getPort();
}
+ /**
+ * To string host string.
+ *
+ * @param address the address
+ * @return the string
+ */
+ public static String toStringHost(InetSocketAddress address) {
+ if (address.getAddress() == null) {
+ return address.getHostString();
+ }
+ return address.getAddress().getHostAddress();
+ }
+
/**
* To inet socket address inet socket address.
*
diff --git
a/discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java
b/discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java
index 8ba0d2256e..06bed7b639 100644
---
a/discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java
+++
b/discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java
@@ -24,6 +24,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.Optional;
+import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
@@ -38,6 +41,8 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.exception.AuthenticationFailedException;
+import org.apache.seata.common.exception.NotSupportYetException;
+import org.apache.seata.common.exception.ParseEndpointException;
import org.apache.seata.common.exception.RetryableException;
import org.apache.seata.common.metadata.Metadata;
import org.apache.seata.common.metadata.MetadataResponse;
@@ -45,6 +50,7 @@ import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.HttpClientUtil;
+import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.ConfigChangeListener;
import org.apache.seata.config.Configuration;
@@ -114,10 +120,13 @@ public class RaftRegistryServiceImpl implements
RegistryService<ConfigChangeList
*/
private static final Map<String, List<InetSocketAddress>> ALIVE_NODES =
new ConcurrentHashMap<>();
+ private static final String PREFERRED_NETWORKS;
+
static {
TOKEN_EXPIRE_TIME_IN_MILLISECONDS =
CONFIG.getLong(getTokenExpireTimeInMillisecondsKey(), 29 * 60 * 1000L);
USERNAME = CONFIG.getConfig(getRaftUserNameKey());
PASSWORD = CONFIG.getConfig(getRaftPassWordKey());
+ PREFERRED_NETWORKS = CONFIG.getConfig(getPreferredNetworks());
}
private RaftRegistryServiceImpl() {
@@ -221,7 +230,7 @@ public class RaftRegistryServiceImpl implements
RegistryService<ConfigChangeList
List<InetSocketAddress> inetSocketAddresses =
ALIVE_NODES.get(CURRENT_TRANSACTION_SERVICE_GROUP);
if (CollectionUtils.isEmpty(inetSocketAddresses)) {
addressList =
- nodeList.stream().map(node ->
node.getControl().createAddress()).collect(Collectors.toList());
+
nodeList.stream().map(RaftRegistryServiceImpl::selectControlEndpointStr).collect(Collectors.toList());
} else {
stream = inetSocketAddresses.stream();
}
@@ -234,15 +243,20 @@ public class RaftRegistryServiceImpl implements
RegistryService<ConfigChangeList
Map<String, Node> map = new HashMap<>();
if (CollectionUtils.isNotEmpty(nodeList)) {
for (Node node : nodeList) {
- map.put(new
InetSocketAddress(node.getTransaction().getHost(),
node.getTransaction().getPort()).getAddress().getHostAddress()
- + IP_PORT_SPLIT_CHAR +
node.getTransaction().getPort(), node);
+ InetSocketAddress inetSocketAddress =
selectTransactionEndpoint(node);
+ map.put(inetSocketAddress.getHostString()
+ + IP_PORT_SPLIT_CHAR + inetSocketAddress.getPort(),
node);
}
}
addressList = stream.map(inetSocketAddress -> {
- String host = inetSocketAddress.getAddress().getHostAddress();
+ String host = NetUtil.toStringHost(inetSocketAddress);
Node node = map.get(host + IP_PORT_SPLIT_CHAR +
inetSocketAddress.getPort());
+ InetSocketAddress controlEndpoint = null;
+ if (node != null) {
+ controlEndpoint = selectControlEndpoint(node);
+ }
return host + IP_PORT_SPLIT_CHAR
- + (node != null ? node.getControl().getPort() :
inetSocketAddress.getPort());
+ + (controlEndpoint != null ? controlEndpoint.getPort() :
inetSocketAddress.getPort());
}).collect(Collectors.toList());
return
addressList.get(ThreadLocalRandom.current().nextInt(addressList.size()));
}
@@ -263,6 +277,11 @@ public class RaftRegistryServiceImpl implements
RegistryService<ConfigChangeList
REGISTRY_TYPE, PRO_PASSWORD_KEY);
}
+ private static String getPreferredNetworks() {
+ return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
ConfigurationKeys.FILE_ROOT_REGISTRY,
+ "preferredNetworks");
+ }
+
private static String getTokenExpireTimeInMillisecondsKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
ConfigurationKeys.FILE_ROOT_REGISTRY,
REGISTRY_TYPE, TOKEN_VALID_TIME_MS_KEY);
@@ -276,9 +295,85 @@ public class RaftRegistryServiceImpl implements
RegistryService<ConfigChangeList
return System.currentTimeMillis() >= tokenExpiredTime;
}
- private InetSocketAddress convertInetSocketAddress(Node node) {
- Node.Endpoint endpoint = node.getTransaction();
- return new InetSocketAddress(endpoint.getHost(), endpoint.getPort());
+ private static String selectControlEndpointStr(Node node) {
+ InetSocketAddress control = selectControlEndpoint(node);
+ return NetUtil.toStringAddress(control);
+ }
+
+ private static String selectTransactionEndpointStr(Node node) {
+ InetSocketAddress transaction = selectTransactionEndpoint(node);
+ return NetUtil.toStringAddress(transaction);
+ }
+
+ private static InetSocketAddress selectControlEndpoint(Node node) {
+ return selectEndpoint("control", node);
+ }
+
+ private static InetSocketAddress selectTransactionEndpoint(Node node) {
+ return selectEndpoint("transaction", node);
+ }
+
+ private static InetSocketAddress selectEndpoint(String type, Node node) {
+ if (StringUtils.isBlank(PREFERRED_NETWORKS)) {
+ // Use the default method, directly using node.control and
node.transaction
+ switch (type) {
+ case "control":
+ return new InetSocketAddress(node.getControl().getHost(),
node.getControl().getPort());
+ case "transaction":
+ return new
InetSocketAddress(node.getTransaction().getHost(),
node.getTransaction().getPort());
+ default:
+ throw new NotSupportYetException("SelectEndpoint is not
support type: " + type);
+ }
+ }
+ Node.ExternalEndpoint externalEndpoint = selectExternalEndpoint(node,
PREFERRED_NETWORKS.split(";"));
+ switch (type) {
+ case "control":
+ return new InetSocketAddress(externalEndpoint.getHost(),
externalEndpoint.getControlPort());
+ case "transaction":
+ return new InetSocketAddress(externalEndpoint.getHost(),
externalEndpoint.getTransactionPort());
+ default:
+ throw new NotSupportYetException("SelectEndpoint is not
support type: " + type);
+ }
+ }
+
+ private static Node.ExternalEndpoint selectExternalEndpoint(Node node,
String[] preferredNetworks) {
+ Map<String, Object> metadata = node.getMetadata();
+ if (CollectionUtils.isEmpty(metadata)) {
+ throw new ParseEndpointException("Node metadata is empty.");
+ }
+
+ Object external = metadata.get("external");
+
+ if (external instanceof List<?>) {
+ List<LinkedHashMap<String, Object>> externalEndpoints =
(List<LinkedHashMap<String, Object>>) external;
+
+ if (CollectionUtils.isEmpty(externalEndpoints)) {
+ throw new ParseEndpointException("ExternalEndpoints should not
be empty.");
+ }
+
+ for (LinkedHashMap<String, Object> externalEndpoint :
externalEndpoints) {
+ String ip = Optional.ofNullable(externalEndpoint.get("host"))
+ .map(Object::toString)
+ .orElse("");
+
+ if (isPreferredNetwork(ip, Arrays.asList(preferredNetworks))) {
+ return createExternalEndpoint(externalEndpoint, ip);
+ }
+ }
+ }
+ throw new ParseEndpointException("No ExternalEndpoints value
matches.");
+ }
+
+ private static boolean isPreferredNetwork(String ip, List<String>
preferredNetworks) {
+ return preferredNetworks.stream().anyMatch(regex ->
+ StringUtils.isNotBlank(regex) && (ip.matches(regex) ||
ip.startsWith(regex))
+ );
+ }
+
+ private static Node.ExternalEndpoint
createExternalEndpoint(LinkedHashMap<String, Object> externalEndpoint, String
ip) {
+ int controlPort =
Integer.parseInt(externalEndpoint.get("controlPort").toString());
+ int transactionPort =
Integer.parseInt(externalEndpoint.get("transactionPort").toString());
+ return new Node.ExternalEndpoint(ip, controlPort, transactionPort);
}
@Override
@@ -292,7 +387,7 @@ public class RaftRegistryServiceImpl implements
RegistryService<ConfigChangeList
String clusterName = getServiceGroup(transactionServiceGroup);
Node leader = METADATA.getLeader(clusterName);
if (leader != null) {
- return
Collections.singletonList(convertInetSocketAddress(leader));
+ return
Collections.singletonList(selectTransactionEndpoint(leader));
}
}
return RegistryService.super.aliveLookup(transactionServiceGroup);
@@ -340,7 +435,7 @@ public class RaftRegistryServiceImpl implements
RegistryService<ConfigChangeList
List<InetSocketAddress> aliveAddress) {
if (METADATA.isRaftMode()) {
Node leader =
METADATA.getLeader(getServiceGroup(transactionServiceGroup));
- InetSocketAddress leaderAddress = convertInetSocketAddress(leader);
+ InetSocketAddress leaderAddress =
selectTransactionEndpoint(leader);
return ALIVE_NODES.put(transactionServiceGroup,
aliveAddress.isEmpty() ? aliveAddress :
aliveAddress.parallelStream().filter(inetSocketAddress -> {
// Since only follower will turn into leader, only the
follower node needs to be listened to
@@ -478,7 +573,7 @@ public class RaftRegistryServiceImpl implements
RegistryService<ConfigChangeList
}
List<Node> nodes = METADATA.getNodes(clusterName);
if (CollectionUtils.isNotEmpty(nodes)) {
- return
nodes.parallelStream().map(this::convertInetSocketAddress).collect(Collectors.toList());
+ return
nodes.parallelStream().map(RaftRegistryServiceImpl::selectTransactionEndpoint).collect(Collectors.toList());
}
return Collections.emptyList();
}
diff --git
a/discovery/seata-discovery-raft/src/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java
b/discovery/seata-discovery-raft/src/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java
index e6c88fcc5c..45b675dbe5 100644
---
a/discovery/seata-discovery-raft/src/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java
+++
b/discovery/seata-discovery-raft/src/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java
@@ -17,13 +17,18 @@
package org.apache.seata.discovery.registry.raft;
-import org.apache.seata.common.util.HttpClientUtil;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seata.common.metadata.MetadataResponse;
+import org.apache.seata.common.metadata.Node;
+import org.apache.seata.common.util.*;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.StringEntity;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
@@ -33,7 +38,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Map;
+import java.util.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,6 +58,7 @@ class RaftRegistryServiceImplTest {
System.setProperty("registry.raft.password", "seata");
System.setProperty("registry.raft.serverAddr", "127.0.0.1:8092");
System.setProperty("registry.raft.tokenValidityInMilliseconds",
"10000");
+ System.setProperty("registry.preferredNetworks", "10.10.*");
ConfigurationFactory.getInstance();
}
@@ -145,4 +151,28 @@ class RaftRegistryServiceImplTest {
assertEquals(true, rst);
}
+ /**
+ * RaftRegistryServiceImpl#controlEndpointStr()
+ * RaftRegistryServiceImpl#transactionEndpointStr()
+ */
+ @Test
+ public void testSelectEndpoint() throws JsonProcessingException,
NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ String jsonString =
"{\"nodes\":[{\"control\":{\"host\":\"v-0.svc-l.default.svc.cluster.local\",\"port\":7091},\"transaction\":{\"host\":\"v-0.svc-l.default.svc.cluster.local\",\"port\":8091},\"internal\":{\"host\":\"v-0.svc-l.default.svc.cluster.local\",\"port\":9091},\"group\":\"default\",\"role\":\"LEADER\",\"version\":\"2.3.0-SNAPSHOT\",\"metadata\":{\"external\":[{\"host\":\"192.168.105.7\",\"controlPort\":30071,\"transactionPort\":30091},{\"host\":\"10.10.105.7\",\"controlP
[...]
+
+ Method selectControlEndpointStrMethod =
RaftRegistryServiceImpl.class.getDeclaredMethod("selectControlEndpointStr",
Node.class);
+ selectControlEndpointStrMethod.setAccessible(true);
+ Method selectTransactionEndpointStrMethod =
RaftRegistryServiceImpl.class.getDeclaredMethod("selectTransactionEndpointStr",
Node.class);
+ selectTransactionEndpointStrMethod.setAccessible(true);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ MetadataResponse metadataResponse = objectMapper.readValue(jsonString,
MetadataResponse.class);
+ List<Node> nodes = metadataResponse.getNodes();
+
+ for (Node node : nodes) {
+ String controlEndpointStr = (String)
selectControlEndpointStrMethod.invoke(null, node);;
+ String transactionEndpointStr = (String)
selectTransactionEndpointStrMethod.invoke(null, node);;
+
Assertions.assertTrue(controlEndpointStr.contains("10.10.105.7:3007"));
+
Assertions.assertTrue(transactionEndpointStr.contains("10.10.105.7:3009"));
+ }
+ }
}
diff --git a/script/client/conf/registry.conf b/script/client/conf/registry.conf
index 1aabbc507f..d21f03f723 100644
--- a/script/client/conf/registry.conf
+++ b/script/client/conf/registry.conf
@@ -18,7 +18,9 @@
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom、raft、seata
type = "file"
-
+ # Supports address translation parameters, currently only supported in raft
mode,
+ # if match the preferredNetworks rule return the first, eg:
preferredNetworks = "192.168.*"
+ preferredNetworks = ""
raft {
metadata-max-age-ms = 30000
serverAddr = "127.0.0.1:7091"
diff --git a/script/client/spring/application.properties
b/script/client/spring/application.properties
index 2a72d1e5f7..620cbd9714 100755
--- a/script/client/spring/application.properties
+++ b/script/client/spring/application.properties
@@ -123,6 +123,10 @@ seata.config.custom.name=
seata.registry.type=file
+# Supports address translation parameters, currently only supported in raft
mode?
+# if match the preferredNetworks rule return the first, eg: preferredNetworks
= "192.168.*"
+seata.registry.preferredNetworks = ""
+
seata.registry.raft.server-addr=
seata.registry.raft.metadata-max-age-ms=30000
seata.registry.raft.username=seata
diff --git a/script/client/spring/application.yml
b/script/client/spring/application.yml
index a6100f0574..580cb0180e 100755
--- a/script/client/spring/application.yml
+++ b/script/client/spring/application.yml
@@ -132,6 +132,9 @@ seata:
name:
registry:
type: file
+ # Supports address translation parameters, currently only supported in
raft mode,
+ # if match the preferredNetworks rule return the first, eg:
preferredNetworks = "192.168.*"
+ preferredNetworks: ""
seata:
server-addr: 127.0.0.1:8081
namespace: public
diff --git
a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java
b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java
index 67bc7bd75a..bfa4923469 100644
---
a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java
+++
b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java
@@ -40,6 +40,7 @@ import
org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryRa
import
org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryRedisProperties;
import
org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistrySofaProperties;
import
org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryZooKeeperProperties;
+import
org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryMetadataProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.core.Ordered;
@@ -69,6 +70,7 @@ import static
org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGIST
import static
org.apache.seata.spring.boot.autoconfigure.StarterConstants.SHUTDOWN_PREFIX;
import static
org.apache.seata.spring.boot.autoconfigure.StarterConstants.THREAD_FACTORY_PREFIX;
import static
org.apache.seata.spring.boot.autoconfigure.StarterConstants.TRANSPORT_PREFIX;
+import static
org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_METADATA_PREFIX;
public class SeataCoreEnvironmentPostProcessor implements
EnvironmentPostProcessor, Ordered {
@@ -90,6 +92,7 @@ public class SeataCoreEnvironmentPostProcessor implements
EnvironmentPostProcess
PROPERTY_BEAN_MAP.put(CONFIG_PREFIX, ConfigProperties.class);
PROPERTY_BEAN_MAP.put(CONFIG_FILE_PREFIX,
ConfigFileProperties.class);
PROPERTY_BEAN_MAP.put(REGISTRY_PREFIX, RegistryProperties.class);
+ PROPERTY_BEAN_MAP.put(REGISTRY_METADATA_PREFIX,
RegistryMetadataProperties.class);
PROPERTY_BEAN_MAP.put(CONFIG_NACOS_PREFIX,
ConfigNacosProperties.class);
PROPERTY_BEAN_MAP.put(CONFIG_CONSUL_PREFIX,
ConfigConsulProperties.class);
diff --git
a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java
b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java
index 5ec088d43f..db09911af1 100644
---
a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java
+++
b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java
@@ -58,6 +58,8 @@ public interface StarterConstants {
String REGISTRY_SOFA_PREFIX = REGISTRY_PREFIX + ".sofa";
String REGISTRY_CUSTOM_PREFIX = REGISTRY_PREFIX + ".custom";
+ String REGISTRY_METADATA_PREFIX = REGISTRY_PREFIX + ".metadata";
+
String CONFIG_PREFIX = SEATA_PREFIX + ".config";
String CONFIG_NACOS_PREFIX = CONFIG_PREFIX + ".nacos";
String CONFIG_CONSUL_PREFIX = CONFIG_PREFIX + ".consul";
diff --git
a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryMetadataProperties.java
b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryMetadataProperties.java
new file mode 100644
index 0000000000..595b322b79
--- /dev/null
+++
b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryMetadataProperties.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.spring.boot.autoconfigure.properties.registry;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+import static
org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_METADATA_PREFIX;
+
+@Component
+@ConfigurationProperties(prefix = REGISTRY_METADATA_PREFIX)
+public class RegistryMetadataProperties {
+ private String external;
+
+ public String getExternal() {
+ return external;
+ }
+
+ public RegistryMetadataProperties setExternal(String external) {
+ this.external = external;
+ return this;
+ }
+}
diff --git
a/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java
b/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java
index 2e3e217a96..7f51ff9efa 100644
---
a/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java
+++
b/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java
@@ -21,10 +21,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-
+import org.apache.seata.common.holder.ObjectHolder;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.protocol.Version;
+import org.springframework.core.env.ConfigurableEnvironment;
+import static
org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;
/**
*/
@@ -55,7 +57,16 @@ public class RaftClusterMetadata implements Serializable {
node.setGroup(group);
node.setVersion(Version.getCurrent());
node.setInternal(node.createEndpoint(host, internalPort, "raft"));
- Optional.ofNullable(metadata).ifPresent(node::setMetadata);
+ ConfigurableEnvironment environment = (ConfigurableEnvironment)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);
+ String seataRegistryMetadataExternalValue =
environment.resolvePlaceholders("${SEATA_REGISTRY_METADATA_EXTERNAL:${seata.registry.metadata.external:}}");
+ if (metadata != null) {
+ if (StringUtils.isNotEmpty(seataRegistryMetadataExternalValue)) {
+ Map<String, Object> newMetadata =
node.updateMetadataWithExternalEndpoints(metadata,
node.createExternalEndpoints(seataRegistryMetadataExternalValue));
+ Optional.ofNullable(newMetadata).ifPresent(node::setMetadata);
+ } else {
+ node.setMetadata(metadata);
+ }
+ }
return node;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]