This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 43e22edd6 [ISSUE #4835]fix gateway proxy websocket lost the
user-defined closeStatus (code and reason). (#4844)
43e22edd6 is described below
commit 43e22edd6f44adaf85c975401806b4b145c90d1c
Author: lianjunwei <[email protected]>
AuthorDate: Mon Jul 24 17:30:50 2023 +0800
[ISSUE #4835]fix gateway proxy websocket lost the user-defined closeStatus
(code and reason). (#4844)
* apidoc sql
* refact
* commit
* [Task] Shenyu-admin: Fix API document failed to build because of NPE.
* [Task] Shenyu-admin: Fix API document failed to build because of NPE.
* solve conficts,modify LICENSE.
* delete useless code.
* delete useless code.
* commit
* [ISSUE #3843]admin apidoc fix: the required attribute prompt is incorrect
when micro service parameter uses "@ApiModelProperty".
* commit
* [shenyu-examples]add swagger to the example project to test the apidoc
function of the gateway management system.
* commit
* commit
* commit
* [ISSUE #4690]Supports gzip compression in response to HTTP requests.
* [examples]Add Swagger sample project to demonstrate automatic pull
interface documentation.
* delete exapmple
* [fix#4835]Shenyu gateway proxy websocket lost the user-defined
closeStatus (code and reason).
* delete useless code.
* delete useless code.
* delete useless code.
---------
Co-authored-by: lianjunwei <[email protected]>
Co-authored-by: dragon-zhang <[email protected]>
Co-authored-by: xiaoyu <[email protected]>
---
.../websocket/handler/HttpAuthHandler.java | 14 ++-
.../shenyu/plugin/base/AbstractShenyuPlugin.java | 2 +-
.../shenyu/plugin/websocket/WebSocketPlugin.java | 109 +++++++++++++++------
3 files changed, 90 insertions(+), 35 deletions(-)
diff --git
a/shenyu-examples/shenyu-examples-websocket/shenyu-example-spring-native-websocket/src/main/java/org/apache/shenyu/examples/websocket/handler/HttpAuthHandler.java
b/shenyu-examples/shenyu-examples-websocket/shenyu-example-spring-native-websocket/src/main/java/org/apache/shenyu/examples/websocket/handler/HttpAuthHandler.java
index e5eba39a2..8545139ad 100644
---
a/shenyu-examples/shenyu-examples-websocket/shenyu-example-spring-native-websocket/src/main/java/org/apache/shenyu/examples/websocket/handler/HttpAuthHandler.java
+++
b/shenyu-examples/shenyu-examples-websocket/shenyu-example-spring-native-websocket/src/main/java/org/apache/shenyu/examples/websocket/handler/HttpAuthHandler.java
@@ -43,6 +43,7 @@ public class HttpAuthHandler extends TextWebSocketHandler {
*/
@Override
public void afterConnectionEstablished(final WebSocketSession session)
throws Exception {
+ LOG.info("The connection has been established successfully.");
Object token = session.getAttributes().get("token");
if (Objects.nonNull(token)) {
// The user is successfully connected and put into the online user
cache.
@@ -64,20 +65,27 @@ public class HttpAuthHandler extends TextWebSocketHandler {
// Get the message from the client.
String payload = message.getPayload();
Object token = session.getAttributes().get("token");
- LOG.info("server received " + token + " sent " + payload);
- session.sendMessage(new TextMessage("apache shenyu server send to " +
token + " message : -> " + payload));
+ LOG.info("server received {}, sent {}", token, payload);
+ boolean isTestClose = (Objects.nonNull(token) && Objects.equals(token,
"testCloseStatus"))
+ || (Objects.nonNull(payload) && Objects.equals(payload,
"testCloseStatus"));
+ if (isTestClose) {
+ session.close(new CloseStatus(4400, "test:apache shenyu server
close, return closeStatus"));
+ } else {
+ session.sendMessage(new TextMessage("apache shenyu server send to
" + token + " message : -> " + payload));
+ }
}
/**
* when socket disconnected.
*
- * @param session session
+ * @param session session
* @param status close status
* @throws Exception exception
*/
@Override
public void afterConnectionClosed(final WebSocketSession session, final
CloseStatus status) throws Exception {
Object token = session.getAttributes().get("token");
+ LOG.info("closed with status: {}" + status);
if (Objects.nonNull(token)) {
// The user exits and removes the cache.
WsSessionManager.remove(token.toString());
diff --git
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/AbstractShenyuPlugin.java
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/AbstractShenyuPlugin.java
index 7a2bc66d4..8020a0e88 100644
---
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/AbstractShenyuPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/AbstractShenyuPlugin.java
@@ -323,7 +323,7 @@ public abstract class AbstractShenyuPlugin implements
ShenyuPlugin {
SelectorData selectorData = null;
ShenyuTrieNode shenyuTrieNode = selectorTrie.match(path, pluginName);
if (Objects.nonNull(shenyuTrieNode)) {
- LogUtils.info(LOG, "{} selector match path from shenyu trie");
+ LogUtils.info(LOG, "{} selector match path from shenyu trie,
path:{}", pluginName, path);
List<?> collection = shenyuTrieNode.getPathCache().get(pluginName);
if (CollectionUtils.isNotEmpty(collection)) {
Pair<Boolean, SelectorData> selectorDataPair;
diff --git
a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-websocket/src/main/java/org/apache/shenyu/plugin/websocket/WebSocketPlugin.java
b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-websocket/src/main/java/org/apache/shenyu/plugin/websocket/WebSocketPlugin.java
index d4fa1b496..3c91715bf 100644
---
a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-websocket/src/main/java/org/apache/shenyu/plugin/websocket/WebSocketPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-websocket/src/main/java/org/apache/shenyu/plugin/websocket/WebSocketPlugin.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.NonNull;
import org.springframework.util.StringUtils;
+import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
@@ -62,15 +63,15 @@ import java.util.stream.Collectors;
* The type Web socket plugin.
*/
public class WebSocketPlugin extends AbstractShenyuPlugin {
-
+
private static final Logger LOG =
LoggerFactory.getLogger(WebSocketPlugin.class);
-
+
private static final String SEC_WEB_SOCKET_PROTOCOL =
"Sec-WebSocket-Protocol";
-
+
private final WebSocketClient webSocketClient;
-
+
private final WebSocketService webSocketService;
-
+
/**
* Instantiates a new Web socket plugin.
*
@@ -81,7 +82,7 @@ public class WebSocketPlugin extends AbstractShenyuPlugin {
this.webSocketClient = webSocketClient;
this.webSocketService = webSocketService;
}
-
+
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
final List<Upstream> upstreamList =
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
@@ -94,7 +95,7 @@ public class WebSocketPlugin extends AbstractShenyuPlugin {
final String ip =
Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
Upstream upstream = LoadBalancerFactory.selector(upstreamList,
ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(upstream)) {
- LOG.error("websocket has no upstream");
+ LOG.error("websocket has no upstream, error:{}", rule);
Object error = ShenyuResultWrap.error(exchange,
ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL);
return WebFluxResultUtils.result(exchange, error);
}
@@ -104,11 +105,11 @@ public class WebSocketPlugin extends AbstractShenyuPlugin
{
return this.webSocketService.handleRequest(exchange, new
ShenyuWebSocketHandler(
wsRequestUrl, this.webSocketClient, filterHeaders(headers),
buildWsProtocols(headers)));
}
-
+
private WebSocketRuleHandle buildRuleHandle(final RuleData rule) {
return
WebSocketPluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));
}
-
+
private String buildWsRealPath(final ServerWebExchange exchange, final
Upstream upstream, final ShenyuContext shenyuContext) {
String protocol = upstream.getProtocol();
if (!StringUtils.hasLength(protocol)) {
@@ -120,7 +121,7 @@ public class WebSocketPlugin extends AbstractShenyuPlugin {
}
return protocol + upstream.getUrl() + path;
}
-
+
private List<String> buildWsProtocols(final HttpHeaders headers) {
List<String> protocols = headers.get(SEC_WEB_SOCKET_PROTOCOL);
if (CollectionUtils.isEmpty(protocols)) {
@@ -131,7 +132,7 @@ public class WebSocketPlugin extends AbstractShenyuPlugin {
.map(String::trim)
.collect(Collectors.toList());
}
-
+
private HttpHeaders filterHeaders(final HttpHeaders headers) {
HttpHeaders filtered = new HttpHeaders();
headers.entrySet().stream()
@@ -141,12 +142,52 @@ public class WebSocketPlugin extends AbstractShenyuPlugin
{
header.getValue()));
return filtered;
}
-
+
+ // see https://github.com/spring-cloud/spring-cloud-gateway/pull/2254
+ private static CloseStatus adaptCloseStatus(final CloseStatus closeStatus)
{
+ int code = closeStatus.getCode();
+ if (code > 2999 && code < 5000) {
+ return closeStatus;
+ }
+ switch (code) {
+ case 1000:
+ case 1001:
+ case 1002:
+ case 1003:
+ case 1007:
+ case 1008:
+ case 1009:
+ case 1010:
+ case 1011:
+ return closeStatus;
+ case 1004:
+ // Should not be used in a close frame
+ // RESERVED;
+ case 1005:
+ // Should not be used in a close frame
+ // return CloseStatus.NO_STATUS_CODE;
+ case 1006:
+ // Should not be used in a close frame
+ // return CloseStatus.NO_CLOSE_FRAME;
+ case 1012:
+ // Not in RFC6455
+ // return CloseStatus.SERVICE_RESTARTED;
+ case 1013:
+ // Not in RFC6455
+ // return CloseStatus.SERVICE_OVERLOAD;
+ case 1015:
+ // Should not be used in a close frame
+ // return CloseStatus.TLS_HANDSHAKE_FAILURE;
+ default:
+ return CloseStatus.PROTOCOL_ERROR;
+ }
+ }
+
@Override
public String named() {
return PluginEnum.WEB_SOCKET.getName();
}
-
+
/**
* plugin is execute.
*
@@ -156,32 +197,32 @@ public class WebSocketPlugin extends AbstractShenyuPlugin
{
public boolean skip(final ServerWebExchange exchange) {
return skipExcept(exchange, RpcTypeEnum.WEB_SOCKET);
}
-
+
@Override
protected Mono<Void> handleSelectorIfNull(final String pluginName, final
ServerWebExchange exchange, final ShenyuPluginChain chain) {
return WebFluxResultUtils.noSelectorResult(pluginName, exchange);
}
-
+
@Override
protected Mono<Void> handleRuleIfNull(final String pluginName, final
ServerWebExchange exchange, final ShenyuPluginChain chain) {
return WebFluxResultUtils.noRuleResult(pluginName, exchange);
}
-
+
@Override
public int getOrder() {
return PluginEnum.WEB_SOCKET.getCode();
}
-
+
private static class ShenyuWebSocketHandler implements WebSocketHandler {
-
+
private final WebSocketClient client;
-
+
private final URI url;
-
+
private final HttpHeaders headers;
-
+
private final List<String> subProtocols;
-
+
/**
* Instantiates a new shenyu web socket handler.
*
@@ -198,30 +239,36 @@ public class WebSocketPlugin extends AbstractShenyuPlugin
{
this.headers = headers;
this.subProtocols = ObjectUtils.defaultIfNull(protocols,
Collections.emptyList());
}
-
+
@NonNull
@Override
public List<String> getSubProtocols() {
return this.subProtocols;
}
-
+
@NonNull
@Override
public Mono<Void> handle(@NonNull final WebSocketSession session) {
// pass headers along so custom headers can be sent through
return client.execute(url, this.headers, new WebSocketHandler() {
-
+
@NonNull
@Override
- public Mono<Void> handle(@NonNull final WebSocketSession
webSocketSession) {
+ public Mono<Void> handle(@NonNull final WebSocketSession
proxySocketSession) {
+ Mono<Void> serverClose =
proxySocketSession.closeStatus().filter(it -> session.isOpen())
+
.map(WebSocketPlugin::adaptCloseStatus).flatMap(session::close);
+ Mono<Void> proxyClose = session.closeStatus().filter(it ->
proxySocketSession.isOpen())
+
.map(WebSocketPlugin::adaptCloseStatus).flatMap(proxySocketSession::close);
// Use retain() for Reactor Netty
- Mono<Void> sessionSend = webSocketSession
-
.send(session.receive().doOnNext(WebSocketMessage::retain));
+ Mono<Void> proxySessionSend = proxySocketSession
+
.send(session.receive().doOnNext(WebSocketMessage::retain));
Mono<Void> serverSessionSend = session.send(
-
webSocketSession.receive().doOnNext(WebSocketMessage::retain));
- return Mono.zip(sessionSend, serverSessionSend).then();
+
proxySocketSession.receive().doOnNext(WebSocketMessage::retain));
+ // Ensure closeStatus from one propagates to the other
+ Mono.when(serverClose, proxyClose).subscribe();
+ return Mono.zip(proxySessionSend,
serverSessionSend).then();
}
-
+
@NonNull
@Override
public List<String> getSubProtocols() {