This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 43e22edd6 [ISSUE #4835]fix gateway proxy websocket lost the 
user-defined closeStatus (code and reason). (#4844)
43e22edd6 is described below

commit 43e22edd6f44adaf85c975401806b4b145c90d1c
Author: lianjunwei <[email protected]>
AuthorDate: Mon Jul 24 17:30:50 2023 +0800

    [ISSUE #4835]fix gateway proxy websocket lost the user-defined closeStatus 
(code and reason). (#4844)
    
    * apidoc sql
    
    * refact
    
    * commit
    
    * [Task] Shenyu-admin: Fix API document failed to build because of NPE.
    
    * [Task] Shenyu-admin: Fix API document failed to build because of NPE.
    
    * solve conficts,modify LICENSE.
    
    * delete useless code.
    
    * delete useless code.
    
    * commit
    
    * [ISSUE #3843]admin apidoc fix: the required attribute prompt is incorrect 
when micro service parameter uses "@ApiModelProperty".
    
    * commit
    
    * [shenyu-examples]add swagger to the example project to test the apidoc 
function of the gateway management system.
    
    * commit
    
    * commit
    
    * commit
    
    * [ISSUE #4690]Supports gzip compression in response to HTTP requests.
    
    * [examples]Add Swagger sample project to demonstrate automatic pull 
interface documentation.
    
    * delete exapmple
    
    * [fix#4835]Shenyu gateway proxy websocket lost the user-defined 
closeStatus (code and reason).
    
    * delete useless code.
    
    * delete useless code.
    
    * delete useless code.
    
    ---------
    
    Co-authored-by: lianjunwei <[email protected]>
    Co-authored-by: dragon-zhang <[email protected]>
    Co-authored-by: xiaoyu <[email protected]>
---
 .../websocket/handler/HttpAuthHandler.java         |  14 ++-
 .../shenyu/plugin/base/AbstractShenyuPlugin.java   |   2 +-
 .../shenyu/plugin/websocket/WebSocketPlugin.java   | 109 +++++++++++++++------
 3 files changed, 90 insertions(+), 35 deletions(-)

diff --git 
a/shenyu-examples/shenyu-examples-websocket/shenyu-example-spring-native-websocket/src/main/java/org/apache/shenyu/examples/websocket/handler/HttpAuthHandler.java
 
b/shenyu-examples/shenyu-examples-websocket/shenyu-example-spring-native-websocket/src/main/java/org/apache/shenyu/examples/websocket/handler/HttpAuthHandler.java
index e5eba39a2..8545139ad 100644
--- 
a/shenyu-examples/shenyu-examples-websocket/shenyu-example-spring-native-websocket/src/main/java/org/apache/shenyu/examples/websocket/handler/HttpAuthHandler.java
+++ 
b/shenyu-examples/shenyu-examples-websocket/shenyu-example-spring-native-websocket/src/main/java/org/apache/shenyu/examples/websocket/handler/HttpAuthHandler.java
@@ -43,6 +43,7 @@ public class HttpAuthHandler extends TextWebSocketHandler {
      */
     @Override
     public void afterConnectionEstablished(final WebSocketSession session) 
throws Exception {
+        LOG.info("The connection has been established successfully.");
         Object token = session.getAttributes().get("token");
         if (Objects.nonNull(token)) {
             // The user is successfully connected and put into the online user 
cache.
@@ -64,20 +65,27 @@ public class HttpAuthHandler extends TextWebSocketHandler {
         // Get the message from the client.
         String payload = message.getPayload();
         Object token = session.getAttributes().get("token");
-        LOG.info("server received " + token + " sent " + payload);
-        session.sendMessage(new TextMessage("apache shenyu server send to " + 
token + " message : -> " + payload));
+        LOG.info("server received {}, sent {}", token, payload);
+        boolean isTestClose = (Objects.nonNull(token) && Objects.equals(token, 
"testCloseStatus"))
+            || (Objects.nonNull(payload) && Objects.equals(payload, 
"testCloseStatus"));
+        if (isTestClose) {
+            session.close(new CloseStatus(4400, "test:apache shenyu server 
close, return closeStatus"));
+        } else {
+            session.sendMessage(new TextMessage("apache shenyu server send to 
" + token + " message : -> " + payload));
+        }
     }
 
     /**
      * when socket disconnected.
      *
-     * @param session  session
+     * @param session session
      * @param status  close status
      * @throws Exception exception
      */
     @Override
     public void afterConnectionClosed(final WebSocketSession session, final 
CloseStatus status) throws Exception {
         Object token = session.getAttributes().get("token");
+        LOG.info("closed with status: {}" + status);
         if (Objects.nonNull(token)) {
             // The user exits and removes the cache.
             WsSessionManager.remove(token.toString());
diff --git 
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/AbstractShenyuPlugin.java
 
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/AbstractShenyuPlugin.java
index 7a2bc66d4..8020a0e88 100644
--- 
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/AbstractShenyuPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/AbstractShenyuPlugin.java
@@ -323,7 +323,7 @@ public abstract class AbstractShenyuPlugin implements 
ShenyuPlugin {
         SelectorData selectorData = null;
         ShenyuTrieNode shenyuTrieNode = selectorTrie.match(path, pluginName);
         if (Objects.nonNull(shenyuTrieNode)) {
-            LogUtils.info(LOG, "{} selector match path from shenyu trie");
+            LogUtils.info(LOG, "{} selector match path from shenyu trie, 
path:{}", pluginName, path);
             List<?> collection = shenyuTrieNode.getPathCache().get(pluginName);
             if (CollectionUtils.isNotEmpty(collection)) {
                 Pair<Boolean, SelectorData> selectorDataPair;
diff --git 
a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-websocket/src/main/java/org/apache/shenyu/plugin/websocket/WebSocketPlugin.java
 
b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-websocket/src/main/java/org/apache/shenyu/plugin/websocket/WebSocketPlugin.java
index d4fa1b496..3c91715bf 100644
--- 
a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-websocket/src/main/java/org/apache/shenyu/plugin/websocket/WebSocketPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-websocket/src/main/java/org/apache/shenyu/plugin/websocket/WebSocketPlugin.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpHeaders;
 import org.springframework.lang.NonNull;
 import org.springframework.util.StringUtils;
+import org.springframework.web.reactive.socket.CloseStatus;
 import org.springframework.web.reactive.socket.WebSocketHandler;
 import org.springframework.web.reactive.socket.WebSocketMessage;
 import org.springframework.web.reactive.socket.WebSocketSession;
@@ -62,15 +63,15 @@ import java.util.stream.Collectors;
  * The type Web socket plugin.
  */
 public class WebSocketPlugin extends AbstractShenyuPlugin {
-    
+
     private static final Logger LOG = 
LoggerFactory.getLogger(WebSocketPlugin.class);
-    
+
     private static final String SEC_WEB_SOCKET_PROTOCOL = 
"Sec-WebSocket-Protocol";
-    
+
     private final WebSocketClient webSocketClient;
-    
+
     private final WebSocketService webSocketService;
-    
+
     /**
      * Instantiates a new Web socket plugin.
      *
@@ -81,7 +82,7 @@ public class WebSocketPlugin extends AbstractShenyuPlugin {
         this.webSocketClient = webSocketClient;
         this.webSocketService = webSocketService;
     }
-    
+
     @Override
     protected Mono<Void> doExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
         final List<Upstream> upstreamList = 
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
@@ -94,7 +95,7 @@ public class WebSocketPlugin extends AbstractShenyuPlugin {
         final String ip = 
Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
         Upstream upstream = LoadBalancerFactory.selector(upstreamList, 
ruleHandle.getLoadBalance(), ip);
         if (Objects.isNull(upstream)) {
-            LOG.error("websocket has no upstream");
+            LOG.error("websocket has no upstream, error:{}", rule);
             Object error = ShenyuResultWrap.error(exchange, 
ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL);
             return WebFluxResultUtils.result(exchange, error);
         }
@@ -104,11 +105,11 @@ public class WebSocketPlugin extends AbstractShenyuPlugin 
{
         return this.webSocketService.handleRequest(exchange, new 
ShenyuWebSocketHandler(
                 wsRequestUrl, this.webSocketClient, filterHeaders(headers), 
buildWsProtocols(headers)));
     }
-    
+
     private WebSocketRuleHandle buildRuleHandle(final RuleData rule) {
         return 
WebSocketPluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));
     }
-    
+
     private String buildWsRealPath(final ServerWebExchange exchange, final 
Upstream upstream, final ShenyuContext shenyuContext) {
         String protocol = upstream.getProtocol();
         if (!StringUtils.hasLength(protocol)) {
@@ -120,7 +121,7 @@ public class WebSocketPlugin extends AbstractShenyuPlugin {
         }
         return protocol + upstream.getUrl() + path;
     }
-    
+
     private List<String> buildWsProtocols(final HttpHeaders headers) {
         List<String> protocols = headers.get(SEC_WEB_SOCKET_PROTOCOL);
         if (CollectionUtils.isEmpty(protocols)) {
@@ -131,7 +132,7 @@ public class WebSocketPlugin extends AbstractShenyuPlugin {
                 .map(String::trim)
                 .collect(Collectors.toList());
     }
-    
+
     private HttpHeaders filterHeaders(final HttpHeaders headers) {
         HttpHeaders filtered = new HttpHeaders();
         headers.entrySet().stream()
@@ -141,12 +142,52 @@ public class WebSocketPlugin extends AbstractShenyuPlugin 
{
                         header.getValue()));
         return filtered;
     }
-    
+
+    // see https://github.com/spring-cloud/spring-cloud-gateway/pull/2254
+    private static CloseStatus adaptCloseStatus(final CloseStatus closeStatus) 
{
+        int code = closeStatus.getCode();
+        if (code > 2999 && code < 5000) {
+            return closeStatus;
+        }
+        switch (code) {
+            case 1000:
+            case 1001:
+            case 1002:
+            case 1003:
+            case 1007:
+            case 1008:
+            case 1009:
+            case 1010:
+            case 1011:
+                return closeStatus;
+            case 1004:
+                // Should not be used in a close frame
+                // RESERVED;
+            case 1005:
+                // Should not be used in a close frame
+                // return CloseStatus.NO_STATUS_CODE;
+            case 1006:
+                // Should not be used in a close frame
+                // return CloseStatus.NO_CLOSE_FRAME;
+            case 1012:
+                // Not in RFC6455
+                // return CloseStatus.SERVICE_RESTARTED;
+            case 1013:
+                // Not in RFC6455
+                // return CloseStatus.SERVICE_OVERLOAD;
+            case 1015:
+                // Should not be used in a close frame
+                // return CloseStatus.TLS_HANDSHAKE_FAILURE;
+            default:
+                return CloseStatus.PROTOCOL_ERROR;
+        }
+    }
+
     @Override
     public String named() {
         return PluginEnum.WEB_SOCKET.getName();
     }
-    
+
     /**
      * plugin is execute.
      *
@@ -156,32 +197,32 @@ public class WebSocketPlugin extends AbstractShenyuPlugin 
{
     public boolean skip(final ServerWebExchange exchange) {
         return skipExcept(exchange, RpcTypeEnum.WEB_SOCKET);
     }
-    
+
     @Override
     protected Mono<Void> handleSelectorIfNull(final String pluginName, final 
ServerWebExchange exchange, final ShenyuPluginChain chain) {
         return WebFluxResultUtils.noSelectorResult(pluginName, exchange);
     }
-    
+
     @Override
     protected Mono<Void> handleRuleIfNull(final String pluginName, final 
ServerWebExchange exchange, final ShenyuPluginChain chain) {
         return WebFluxResultUtils.noRuleResult(pluginName, exchange);
     }
-    
+
     @Override
     public int getOrder() {
         return PluginEnum.WEB_SOCKET.getCode();
     }
-    
+
     private static class ShenyuWebSocketHandler implements WebSocketHandler {
-        
+
         private final WebSocketClient client;
-        
+
         private final URI url;
-        
+
         private final HttpHeaders headers;
-        
+
         private final List<String> subProtocols;
-        
+
         /**
          * Instantiates a new shenyu web socket handler.
          *
@@ -198,30 +239,36 @@ public class WebSocketPlugin extends AbstractShenyuPlugin 
{
             this.headers = headers;
             this.subProtocols = ObjectUtils.defaultIfNull(protocols, 
Collections.emptyList());
         }
-        
+
         @NonNull
         @Override
         public List<String> getSubProtocols() {
             return this.subProtocols;
         }
-        
+
         @NonNull
         @Override
         public Mono<Void> handle(@NonNull final WebSocketSession session) {
             // pass headers along so custom headers can be sent through
             return client.execute(url, this.headers, new WebSocketHandler() {
-                
+
                 @NonNull
                 @Override
-                public Mono<Void> handle(@NonNull final WebSocketSession 
webSocketSession) {
+                public Mono<Void> handle(@NonNull final WebSocketSession 
proxySocketSession) {
+                    Mono<Void> serverClose = 
proxySocketSession.closeStatus().filter(it -> session.isOpen())
+                        
.map(WebSocketPlugin::adaptCloseStatus).flatMap(session::close);
+                    Mono<Void> proxyClose = session.closeStatus().filter(it -> 
proxySocketSession.isOpen())
+                        
.map(WebSocketPlugin::adaptCloseStatus).flatMap(proxySocketSession::close);
                     // Use retain() for Reactor Netty
-                    Mono<Void> sessionSend = webSocketSession
-                            
.send(session.receive().doOnNext(WebSocketMessage::retain));
+                    Mono<Void> proxySessionSend = proxySocketSession
+                        
.send(session.receive().doOnNext(WebSocketMessage::retain));
                     Mono<Void> serverSessionSend = session.send(
-                            
webSocketSession.receive().doOnNext(WebSocketMessage::retain));
-                    return Mono.zip(sessionSend, serverSessionSend).then();
+                        
proxySocketSession.receive().doOnNext(WebSocketMessage::retain));
+                    // Ensure closeStatus from one propagates to the other
+                    Mono.when(serverClose, proxyClose).subscribe();
+                    return Mono.zip(proxySessionSend, 
serverSessionSend).then();
                 }
-                
+
                 @NonNull
                 @Override
                 public List<String> getSubProtocols() {

Reply via email to