This is an automated email from the ASF dual-hosted git repository.
hefengen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new fc8c796b97 [type:feat]HTTP and WebSocket synchronous mode supports
heartbeat detection (#6179)
fc8c796b97 is described below
commit fc8c796b97e39c6cca46f0208a5bb9f3105f15b6
Author: xchoox <[email protected]>
AuthorDate: Tue Sep 30 09:01:06 2025 +0800
[type:feat]HTTP and WebSocket synchronous mode supports heartbeat detection
(#6179)
* handler
* handler
* commit
* client finish
* 1
* bootstrap finish
* fix
* 'fix'
* 数据可视化
* 图标可视化
* 可视化
* fix cr
* fix cr
* fix cr
* fix cr
* fix cr
* fix cr
* http and websocket heartbeat
* fix
* fix
* fix
* fix
* fix
---------
Co-authored-by: xcsnx <[email protected]>
Co-authored-by: aias00 <[email protected]>
---
.../http/HttpLongPollingDataChangedListener.java | 21 ++++++++++++
.../listener/websocket/WebsocketCollector.java | 39 +++++++++++++++++++++-
.../listener/websocket/WebsocketConfigurator.java | 1 +
.../listener/websocket/WebsocketListener.java | 5 +++
.../apache/shenyu/common/constant/Constants.java | 4 +++
.../client/http/HttpClientRegisterRepository.java | 2 +-
.../websocket/WebsocketSyncDataConfiguration.java | 12 ++++---
.../shenyu/sync/data/http/HttpSyncDataService.java | 8 +++++
.../shenyu-sync-data-websocket/pom.xml | 7 ++--
.../data/websocket/WebsocketSyncDataService.java | 25 +++++++++-----
.../websocket/client/ShenyuWebsocketClient.java | 27 ++++++++++++---
11 files changed, 129 insertions(+), 22 deletions(-)
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
index 3a409f6763..c269664d18 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
@@ -24,10 +24,14 @@ import org.apache.commons.lang3.math.NumberUtils;
import org.apache.shenyu.admin.config.properties.HttpSyncProperties;
import org.apache.shenyu.admin.listener.AbstractDataChangedListener;
import org.apache.shenyu.admin.listener.ConfigDataCache;
+import org.apache.shenyu.admin.model.event.instance.InstanceInfoReportEvent;
import org.apache.shenyu.admin.model.result.ShenyuAdminResult;
+import
org.apache.shenyu.admin.service.publish.InstanceInfoReportEventPublisher;
+import org.apache.shenyu.admin.spring.SpringBeanUtils;
import org.apache.shenyu.admin.utils.ShenyuResultMessage;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.HttpConstants;
+import org.apache.shenyu.common.constant.InstanceTypeConstants;
import org.apache.shenyu.common.dto.AppAuthData;
import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.dto.MetaData;
@@ -84,6 +88,10 @@ public class HttpLongPollingDataChangedListener extends
AbstractDataChangedListe
private static final String X_FORWARDED_FOR_SPLIT_SYMBOL = ",";
+ private static final String X_REAL_PORT = "X-Real-PORT";
+
+ private static final String CLIENT_PORT_ZERO = "0";
+
/**
* Blocked client.
*/
@@ -133,6 +141,19 @@ public class HttpLongPollingDataChangedListener extends
AbstractDataChangedListe
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
final String clientIp = getRemoteIp(request);
final String namespaceId = getNamespaceId(request);
+ final String bootstrapInfo =
StringUtils.defaultString(request.getHeader(InstanceTypeConstants.BOOTSTRAP_INSTANCE_INFO),
"");
+ final String clientPort =
StringUtils.defaultString(request.getHeader(X_REAL_PORT), StringUtils.EMPTY);
+ if (!CLIENT_PORT_ZERO.equals(clientPort)) {
+ InstanceInfoReportEvent instanceInfoReportEvent =
InstanceInfoReportEvent.builder()
+ .instanceIp(clientIp)
+ .instancePort(clientPort)
+
.instanceInfo(GsonUtils.getInstance().toJson(bootstrapInfo))
+
.instanceType(InstanceTypeConstants.BOOTSTRAP_INSTANCE_TYPE)
+ .instanceState(1)
+ .namespaceId(namespaceId)
+ .build();
+
SpringBeanUtils.getInstance().getBean(InstanceInfoReportEventPublisher.class).publish(instanceInfoReportEvent);
+ }
// response immediately.
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketCollector.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketCollector.java
index e142fca8ce..0631b5ddd0 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketCollector.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketCollector.java
@@ -23,14 +23,18 @@ import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.admin.config.properties.ClusterProperties;
import org.apache.shenyu.admin.mode.cluster.service.ClusterSelectMasterService;
+import org.apache.shenyu.admin.model.event.instance.InstanceInfoReportEvent;
import org.apache.shenyu.admin.service.SyncDataService;
+import
org.apache.shenyu.admin.service.publish.InstanceInfoReportEventPublisher;
import org.apache.shenyu.admin.spring.SpringBeanUtils;
import org.apache.shenyu.admin.utils.ThreadLocalUtils;
import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.constant.InstanceTypeConstants;
import org.apache.shenyu.common.constant.RunningModeConstants;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.enums.RunningModeEnum;
import org.apache.shenyu.common.exception.ShenyuException;
+import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.common.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +102,20 @@ public class WebsocketCollector {
.map(Object::toString)
.orElse(StringUtils.EMPTY);
}
+
+ private static String getClientPort(final Session session) {
+ if (!session.isOpen()) {
+ return StringUtils.EMPTY;
+ }
+ Map<String, Object> userProperties = session.getUserProperties();
+ if (MapUtils.isEmpty(userProperties)) {
+ return StringUtils.EMPTY;
+ }
+
+ return
Optional.ofNullable(userProperties.get(Constants.CLIENT_PORT_NAME))
+ .map(Object::toString)
+ .orElse(StringUtils.EMPTY);
+ }
private static String getNamespaceId(final Session session) {
if (!session.isOpen()) {
@@ -124,7 +142,26 @@ public class WebsocketCollector {
@OnMessage
public void onMessage(final String message, final Session session) {
if (!Objects.equals(message, DataEventTypeEnum.MYSELF.name())
- && !Objects.equals(message,
DataEventTypeEnum.RUNNING_MODE.name())) {
+ && !Objects.equals(message,
DataEventTypeEnum.RUNNING_MODE.name())
+ && !message.contains("bootstrapInstanceInfo")) {
+ return;
+ }
+ if (message.contains(InstanceTypeConstants.BOOTSTRAP_INSTANCE_INFO)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("bootstrap report instance info: {}", message);
+ }
+ String namespaceId = getNamespaceId(session);
+ Map<String, Object> infoMap =
GsonUtils.getInstance().convertToMap(message);
+ Object o =
infoMap.get(InstanceTypeConstants.BOOTSTRAP_INSTANCE_INFO);
+ InstanceInfoReportEvent instanceInfoRegisterDTO =
InstanceInfoReportEvent.builder()
+ .instanceIp(getClientIp(session))
+ .instancePort(getClientPort(session))
+
.instanceType(InstanceTypeConstants.BOOTSTRAP_INSTANCE_TYPE)
+ .instanceInfo(GsonUtils.getInstance().toJson(o))
+ .instanceState(1)
+ .namespaceId(namespaceId)
+ .build();
+
SpringBeanUtils.getInstance().getBean(InstanceInfoReportEventPublisher.class).publish(instanceInfoRegisterDTO);
return;
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketConfigurator.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketConfigurator.java
index 5a13a8a728..2c020d6a4d 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketConfigurator.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketConfigurator.java
@@ -54,6 +54,7 @@ public class WebsocketConfigurator extends
ServerEndpointConfig.Configurator imp
public void modifyHandshake(final ServerEndpointConfig sec, final
HandshakeRequest request, final HandshakeResponse response) {
HttpSession httpSession = (HttpSession) request.getHttpSession();
sec.getUserProperties().put(WebsocketListener.CLIENT_IP_NAME,
httpSession.getAttribute(WebsocketListener.CLIENT_IP_NAME));
+ sec.getUserProperties().put(Constants.CLIENT_PORT_NAME,
httpSession.getAttribute(Constants.CLIENT_PORT_NAME));
sec.getUserProperties().put(Constants.SHENYU_NAMESPACE_ID,
httpSession.getAttribute(Constants.SHENYU_NAMESPACE_ID));
super.modifyHandshake(sec, request, response);
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketListener.java
index abff0a7bc7..62f426737f 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketListener.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/websocket/WebsocketListener.java
@@ -53,6 +53,8 @@ public class WebsocketListener implements
ServletRequestListener {
session.removeAttribute(CLIENT_IP_NAME);
request.removeAttribute(Constants.SHENYU_NAMESPACE_ID);
session.removeAttribute(Constants.SHENYU_NAMESPACE_ID);
+ request.removeAttribute(Constants.CLIENT_PORT_NAME);
+ session.removeAttribute(Constants.CLIENT_PORT_NAME);
}
} catch (Exception e) {
LOG.error("request destroyed error", e);
@@ -68,9 +70,12 @@ public class WebsocketListener implements
ServletRequestListener {
request.setAttribute(CLIENT_IP_NAME,
sre.getServletRequest().getRemoteAddr());
session.setAttribute(CLIENT_IP_NAME,
sre.getServletRequest().getRemoteAddr());
String namespace =
request.getHeader(Constants.SHENYU_NAMESPACE_ID);
+ String port = request.getHeader(Constants.CLIENT_PORT_NAME);
if (StringUtils.isNoneBlank(namespace)) {
request.setAttribute(Constants.SHENYU_NAMESPACE_ID,
namespace);
session.setAttribute(Constants.SHENYU_NAMESPACE_ID,
namespace);
+ request.setAttribute(Constants.CLIENT_PORT_NAME, port);
+ session.setAttribute(Constants.CLIENT_PORT_NAME, port);
}
}
} catch (Exception e) {
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
index 5f5a964a32..c597a6b72e 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
@@ -905,6 +905,10 @@ public interface Constants {
*/
String SHENYU_NAMESPACE_ID = "namespaceId";
+ /**
+ * The constant Client Port.
+ */
+ String CLIENT_PORT_NAME = "ClientPort";
/**
* The constant SYS_DEFAULT_NAMESPACE_ID.
*/
diff --git
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/HttpClientRegisterRepository.java
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/HttpClientRegisterRepository.java
index d8615310c5..88ae116051 100644
---
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/HttpClientRegisterRepository.java
+++
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/HttpClientRegisterRepository.java
@@ -32,8 +32,8 @@ import
org.apache.shenyu.register.client.http.utils.RuntimeUtils;
import org.apache.shenyu.register.common.config.ShenyuRegisterCenterConfig;
import org.apache.shenyu.register.common.dto.ApiDocRegisterDTO;
import org.apache.shenyu.register.common.dto.DiscoveryConfigRegisterDTO;
-import org.apache.shenyu.register.common.dto.McpToolsRegisterDTO;
import org.apache.shenyu.register.common.dto.InstanceBeatInfoDTO;
+import org.apache.shenyu.register.common.dto.McpToolsRegisterDTO;
import org.apache.shenyu.register.common.dto.MetaDataRegisterDTO;
import org.apache.shenyu.register.common.dto.URIRegisterDTO;
import org.apache.shenyu.register.common.enums.EventType;
diff --git
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-websocket/src/main/java/org/apache/shenyu/springboot/starter/sync/data/websocket/WebsocketSyncDataConfiguration.java
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-websocket/src/main/java/org/apache/shenyu/springboot/starter/sync/data/websocket/WebsocketSyncDataConfiguration.java
index 200fdc8977..1af3f47dc4 100644
---
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-websocket/src/main/java/org/apache/shenyu/springboot/starter/sync/data/websocket/WebsocketSyncDataConfiguration.java
+++
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-websocket/src/main/java/org/apache/shenyu/springboot/starter/sync/data/websocket/WebsocketSyncDataConfiguration.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -39,8 +40,8 @@ import org.springframework.context.annotation.Configuration;
import java.util.Collections;
import java.util.List;
-/**
- * Websocket sync data configuration for spring boot.
+/**
+ * Websocket sync data configuration for spring boot.
*/
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@@ -60,6 +61,7 @@ public class WebsocketSyncDataConfiguration {
* @param proxySelectorSubscribers the proxySelector subscribers
* @param discoveryUpstreamSubscribers the discoveryUpstream subscribers
* @param aiProxyApiKeySubscribers the ai proxy api key subscribers
+ * @param serverProperties the serverProperties
* @return the sync data service
*/
@Bean
@@ -72,7 +74,8 @@ public class WebsocketSyncDataConfiguration {
final ObjectProvider<List<ProxySelectorDataSubscriber>>
proxySelectorSubscribers,
final ObjectProvider<List<DiscoveryUpstreamDataSubscriber>>
discoveryUpstreamSubscribers,
- final ObjectProvider<List<AiProxyApiKeyDataSubscriber>>
aiProxyApiKeySubscribers) {
+ final ObjectProvider<List<AiProxyApiKeyDataSubscriber>>
aiProxyApiKeySubscribers,
+ final ServerProperties serverProperties) {
LOGGER.info("you use websocket sync shenyu data.......");
return new WebsocketSyncDataService(
websocketConfig.getIfAvailable(WebsocketConfig::new),
@@ -82,7 +85,8 @@ public class WebsocketSyncDataConfiguration {
authSubscribers.getIfAvailable(Collections::emptyList),
proxySelectorSubscribers.getIfAvailable(Collections::emptyList),
discoveryUpstreamSubscribers.getIfAvailable(Collections::emptyList),
-
aiProxyApiKeySubscribers.getIfAvailable(Collections::emptyList));
+
aiProxyApiKeySubscribers.getIfAvailable(Collections::emptyList),
+ serverProperties);
}
/**
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
index aaeae0b86b..b2d46461f0 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
@@ -32,10 +32,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.config.ShenyuConfig;
import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.constant.InstanceTypeConstants;
import org.apache.shenyu.common.dto.ConfigData;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.SystemInfoUtils;
import org.apache.shenyu.common.utils.ThreadUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
@@ -48,6 +50,7 @@ import org.apache.shenyu.sync.data.http.config.HttpConfig;
import org.apache.shenyu.sync.data.http.refresh.DataRefreshFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
@@ -87,6 +90,9 @@ public class HttpSyncDataService implements SyncDataService {
private final ShenyuConfig shenyuConfig;
+ @Value("${server.port}")
+ private int port;
+
public HttpSyncDataService(final HttpConfig httpConfig,
final PluginDataSubscriber pluginDataSubscriber,
final OkHttpClient okHttpClient,
@@ -209,6 +215,8 @@ public class HttpSyncDataService implements SyncDataService
{
Headers headers = new Headers.Builder()
.add(Constants.X_ACCESS_TOKEN,
this.accessTokenManager.getAccessToken())
.add("Content-Type", "application/x-www-form-urlencoded")
+ .add("X-Real-PORT", port + "")
+ .add(InstanceTypeConstants.BOOTSTRAP_INSTANCE_INFO,
SystemInfoUtils.getSystemInfo())
.build();
String listenerUrl = server +
Constants.SHENYU_ADMIN_PATH_CONFIGS_LISTENER;
String uri =
UriComponentsBuilder.fromHttpUrl(listenerUrl).queryParams(params).build(true).toUriString();
diff --git a/shenyu-sync-data-center/shenyu-sync-data-websocket/pom.xml
b/shenyu-sync-data-center/shenyu-sync-data-websocket/pom.xml
index fa6f81c3fc..09b47ca2c0 100644
--- a/shenyu-sync-data-center/shenyu-sync-data-websocket/pom.xml
+++ b/shenyu-sync-data-center/shenyu-sync-data-websocket/pom.xml
@@ -31,16 +31,19 @@
<artifactId>shenyu-sync-data-api</artifactId>
<version>${project.version}</version>
</dependency>
-
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>${java-websocket.version}</version>
</dependency>
-
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
index 94adb93915..4afa7de319 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
@@ -38,6 +38,7 @@ import
org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
import org.apache.shenyu.sync.data.api.SyncDataService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
import java.net.URI;
import java.util.Iterator;
@@ -70,8 +71,7 @@ public class WebsocketSyncDataService implements
SyncDataService {
private final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers;
- private final
List<org.apache.shenyu.sync.data.api.AiProxyApiKeyDataSubscriber>
- aiProxyApiKeyDataSubscribers;
+ private final
List<org.apache.shenyu.sync.data.api.AiProxyApiKeyDataSubscriber>
aiProxyApiKeyDataSubscribers;
private final List<ShenyuWebsocketClient> clients = Lists.newArrayList();
@@ -80,7 +80,9 @@ public class WebsocketSyncDataService implements
SyncDataService {
private final Timer timer;
private TimerTask timerTask;
-
+
+ private final ServerProperties serverProperties;
+
/**
* Instantiates a new Websocket sync cache.
*
@@ -91,6 +93,7 @@ public class WebsocketSyncDataService implements
SyncDataService {
* @param authDataSubscribers the auth data subscribers
* @param proxySelectorDataSubscribers the proxy selector data subscribers
* @param discoveryUpstreamDataSubscribers the discovery upstream data
subscribers
+ * @param serverProperties serverProperties
*/
public WebsocketSyncDataService(
final WebsocketConfig websocketConfig,
@@ -101,7 +104,8 @@ public class WebsocketSyncDataService implements
SyncDataService {
final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers,
final
List<org.apache.shenyu.sync.data.api.AiProxyApiKeyDataSubscriber>
- aiProxyApiKeyDataSubscribers) {
+ aiProxyApiKeyDataSubscribers,
+ final ServerProperties serverProperties) {
this.timer = WheelTimerFactory.getSharedTimer();
this.websocketConfig = websocketConfig;
this.pluginDataSubscriber = pluginDataSubscriber;
@@ -111,6 +115,7 @@ public class WebsocketSyncDataService implements
SyncDataService {
this.discoveryUpstreamDataSubscribers =
discoveryUpstreamDataSubscribers;
this.aiProxyApiKeyDataSubscribers = aiProxyApiKeyDataSubscribers;
this.namespaceId = shenyuConfig.getNamespace();
+ this.serverProperties = serverProperties;
LOG.info("start init connecting...");
List<String> urls = websocketConfig.getUrls();
for (String url : urls) {
@@ -127,7 +132,8 @@ public class WebsocketSyncDataService implements
SyncDataService {
proxySelectorDataSubscribers,
discoveryUpstreamDataSubscribers,
this.aiProxyApiKeyDataSubscribers,
- namespaceId));
+ namespaceId,
+ serverProperties.getPort()));
} else {
clients.add(
new ShenyuWebsocketClient(
@@ -138,7 +144,8 @@ public class WebsocketSyncDataService implements
SyncDataService {
proxySelectorDataSubscribers,
discoveryUpstreamDataSubscribers,
this.aiProxyApiKeyDataSubscribers,
- namespaceId));
+ namespaceId,
+ serverProperties.getPort()));
}
}
LOG.info("start check task...");
@@ -149,7 +156,7 @@ public class WebsocketSyncDataService implements
SyncDataService {
}
});
}
-
+
private void masterCheck() {
if (LOG.isDebugEnabled()) {
LOG.debug("master checking task start...");
@@ -170,7 +177,7 @@ public class WebsocketSyncDataService implements
SyncDataService {
proxySelectorDataSubscribers,
discoveryUpstreamDataSubscribers,
this.aiProxyApiKeyDataSubscribers,
- namespaceId));
+ namespaceId, serverProperties.getPort()));
} else {
clients.add(
new ShenyuWebsocketClient(
@@ -181,7 +188,7 @@ public class WebsocketSyncDataService implements
SyncDataService {
proxySelectorDataSubscribers,
discoveryUpstreamDataSubscribers,
this.aiProxyApiKeyDataSubscribers,
- namespaceId));
+ namespaceId, serverProperties.getPort()));
}
}
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/client/ShenyuWebsocketClient.java
b/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/client/ShenyuWebsocketClient.java
index f6ff41e85c..4913433cd5 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/client/ShenyuWebsocketClient.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/client/ShenyuWebsocketClient.java
@@ -18,6 +18,7 @@
package org.apache.shenyu.plugin.sync.data.websocket.client;
import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.constant.InstanceTypeConstants;
import org.apache.shenyu.common.constant.RunningModeConstants;
import org.apache.shenyu.common.dto.WebsocketData;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
@@ -29,6 +30,7 @@ import org.apache.shenyu.common.timer.TimerTask;
import org.apache.shenyu.common.timer.WheelTimerFactory;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.common.utils.JsonUtils;
+import org.apache.shenyu.common.utils.SystemInfoUtils;
import
org.apache.shenyu.plugin.sync.data.websocket.handler.WebsocketDataHandler;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
@@ -72,7 +74,7 @@ public final class ShenyuWebsocketClient extends
WebSocketClient {
private volatile boolean isConnectedToMaster;
private final String namespaceId;
-
+
/**
* Instantiates a new shenyu websocket client.
*
@@ -90,10 +92,13 @@ public final class ShenyuWebsocketClient extends
WebSocketClient {
final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers,
final List<AiProxyApiKeyDataSubscriber>
aiProxyApiKeyDataSubscribers,
- final String namespaceId) {
+ final String namespaceId,
+ final Integer port
+ ) {
super(serverUri);
this.namespaceId = namespaceId;
- this.addHeader("namespaceId", namespaceId);
+ this.addHeader(Constants.SHENYU_NAMESPACE_ID, namespaceId);
+ this.addHeader(Constants.CLIENT_PORT_NAME, String.valueOf(port));
this.websocketDataHandler = new WebsocketDataHandler(
pluginDataSubscriber,
metaDataSubscribers,
@@ -125,11 +130,13 @@ public final class ShenyuWebsocketClient extends
WebSocketClient {
final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers,
final List<AiProxyApiKeyDataSubscriber>
aiProxyApiKeyDataSubscribers,
- final String namespaceId) {
+ final String namespaceId,
+ final Integer port) {
super(serverUri, headers);
this.namespaceId = namespaceId;
LOG.info("shenyu bootstrap websocket namespaceId: {}", namespaceId);
this.addHeader(Constants.SHENYU_NAMESPACE_ID, namespaceId);
+ this.addHeader(Constants.CLIENT_PORT_NAME, String.valueOf(port));
this.websocketDataHandler = new WebsocketDataHandler(
pluginDataSubscriber,
metaDataSubscribers,
@@ -234,6 +241,7 @@ public final class ShenyuWebsocketClient extends
WebSocketClient {
this.reconnectBlocking();
} else {
this.sendPing();
+ send(getInstanceInfo());
// send(DataEventTypeEnum.RUNNING_MODE.name());
LOG.debug("websocket send to [{}] ping message successful",
this.getURI());
}
@@ -241,7 +249,16 @@ public final class ShenyuWebsocketClient extends
WebSocketClient {
LOG.error("websocket connect is error :{}", e.getMessage());
}
}
-
+
+ private String getInstanceInfo() {
+ // Combine instance and host information
+ Map<String, Object> combinedInfo = Map.of(
+ InstanceTypeConstants.BOOTSTRAP_INSTANCE_INFO,
SystemInfoUtils.getSystemInfo()
+ );
+
+ return GsonUtils.getInstance().toJson(combinedInfo);
+ }
+
/**
* handle admin message.
*