This is an automated email from the ASF dual-hosted git repository.

liuhongyu pushed a commit to branch fix/fix_mcp_streamable
in repository https://gitbox.apache.org/repos/asf/shenyu.git

commit e4608918264cf4274536181d18c9a657ebcfb9a5
Author: liuhy <[email protected]>
AuthorDate: Thu Feb 5 14:47:01 2026 +0800

    fix mcp streamable
---
 pom.xml                                            |  4 +-
 shenyu-plugin/shenyu-plugin-mcp-server/pom.xml     |  4 ++
 .../shenyu/plugin/mcp/server/McpServerPlugin.java  | 15 ++++++--
 .../server/handler/McpServerPluginDataHandler.java |  5 +++
 .../ShenyuSseServerTransportProvider.java          | 17 ++++++---
 ...henyuStreamableHttpServerTransportProvider.java | 44 +++++++++++++---------
 .../pom.xml                                        |  4 ++
 7 files changed, 64 insertions(+), 29 deletions(-)

diff --git a/pom.xml b/pom.xml
index e510a85d0a..d53ade0ddd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,8 +186,8 @@
         <wasmtime-java.version>0.19.0</wasmtime-java.version>
         <bcprov-jdk18on.version>1.78</bcprov-jdk18on.version>
         <oceanbase.version>2.4.12</oceanbase.version>
-        <spring-ai.version>1.0.0</spring-ai.version>
-        <mcp.version>0.10.0</mcp.version>
+        <spring-ai.version>1.1.2</spring-ai.version>
+        <mcp.version>0.17.0</mcp.version>
         <swagger-parser.version>2.1.30</swagger-parser.version>
         <!-- dependency version end -->
     </properties>
diff --git a/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml 
b/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml
index 1f69394e53..a29abdc65b 100644
--- a/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml
+++ b/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml
@@ -69,6 +69,10 @@
             <groupId>io.modelcontextprotocol.sdk</groupId>
             <artifactId>mcp-spring-webflux</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.modelcontextprotocol.sdk</groupId>
+            <artifactId>mcp-json-jackson2</artifactId>
+        </dependency>
 <!--        <dependency>-->
 <!--            <groupId>org.springframework.ai</groupId>-->
 <!--            <artifactId>spring-ai-starter-mcp-server</artifactId>-->
diff --git 
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java
 
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java
index fce5ac904c..ec540683c7 100644
--- 
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java
@@ -143,8 +143,17 @@ public class McpServerPlugin extends AbstractShenyuPlugin {
         LOG.debug("Processing MCP request with URI: {}", uri);
 
         if (!shenyuMcpServerManager.canRoute(uri)) {
-            LOG.debug("URI not handled by MCP server, continuing chain: {}", 
uri);
-            return chain.execute(exchange);
+            ShenyuMcpServer server = 
McpServerPluginDataHandler.CACHED_SERVER.get().obtainHandle(selector.getId());
+            if (Objects.nonNull(server)) {
+                String serverPath = server.getPath();
+                String messageEndpoint = server.getMessageEndpoint();
+                
shenyuMcpServerManager.getOrCreateMcpServerTransport(serverPath, 
messageEndpoint);
+                
shenyuMcpServerManager.getOrCreateStreamableHttpTransport(serverPath + 
STREAMABLE_HTTP_PATH);
+            }
+            if (!shenyuMcpServerManager.canRoute(uri)) {
+                LOG.debug("URI not handled by MCP server, continuing chain: 
{}", uri);
+                return chain.execute(exchange);
+            }
         }
 
         LOG.debug("Handling MCP request for URI: {}", uri);
@@ -572,7 +581,7 @@ public class McpServerPlugin extends AbstractShenyuPlugin {
     private void setCorsHeaders(final ServerWebExchange exchange) {
         exchange.getResponse().getHeaders().set("Access-Control-Allow-Origin", 
"*");
         exchange.getResponse().getHeaders().set("Access-Control-Allow-Headers",
-                "Content-Type, Mcp-Session-Id, Authorization, Last-Event-ID");
+                "Content-Type, Mcp-Session-Id, Authorization, Last-Event-ID, 
Mcp-Protocol-Version");
         exchange.getResponse().getHeaders().set("Access-Control-Allow-Methods",
                 "GET, POST, OPTIONS");
     }
diff --git 
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java
index 97c970d4fa..b660e9df84 100644
--- 
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java
@@ -58,6 +58,8 @@ public class McpServerPluginDataHandler implements 
PluginDataHandler {
     private static final String SLASH = "/";
 
     private static final String STAR = "/**";
+    
+    private static final String STREAMABLE_HTTP_PATH = "/streamablehttp";
 
     private final ShenyuMcpServerManager shenyuMcpServerManager;
 
@@ -95,6 +97,9 @@ public class McpServerPluginDataHandler implements 
PluginDataHandler {
         if (StringUtils.isNotBlank(uri) && 
!shenyuMcpServerManager.hasMcpServer(uri)) {
             shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, 
messageEndpoint);
         }
+        if (StringUtils.isNotBlank(path)) {
+            shenyuMcpServerManager.getOrCreateStreamableHttpTransport(path + 
STREAMABLE_HTTP_PATH);
+        }
 
         // the update is also need to clean, but there is no way to
         // distinguish between crate and update, so it is always clean
diff --git 
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java
 
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java
index f492ae1881..b0bccdf0ef 100644
--- 
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java
+++ 
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java
@@ -17,8 +17,10 @@
 
 package org.apache.shenyu.plugin.mcp.server.transport;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
 import io.modelcontextprotocol.spec.McpError;
 import io.modelcontextprotocol.spec.McpSchema;
 import io.modelcontextprotocol.spec.McpServerSession;
@@ -71,6 +73,8 @@ public class ShenyuSseServerTransportProvider implements 
McpServerTransportProvi
 
     private final ObjectMapper objectMapper;
 
+    private final McpJsonMapper jsonMapper;
+
     /**
      * Base URL for the message endpoint. This is used to construct the full 
URL for
      * clients to send their JSON-RPC messages.
@@ -153,6 +157,7 @@ public class ShenyuSseServerTransportProvider implements 
McpServerTransportProvi
         Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
 
         this.objectMapper = objectMapper;
+        this.jsonMapper = new JacksonMcpJsonMapper(this.objectMapper);
         this.baseUrl = baseUrl;
         this.messageEndpoint = messageEndpoint;
         this.sseEndpoint = sseEndpoint;
@@ -349,7 +354,7 @@ public class ShenyuSseServerTransportProvider implements 
McpServerTransportProvi
 
         return request.bodyToMono(String.class).flatMap(body -> {
             try {
-                McpSchema.JSONRPCMessage message = 
McpSchema.deserializeJsonRpcMessage(objectMapper, body);
+                McpSchema.JSONRPCMessage message = 
McpSchema.deserializeJsonRpcMessage(jsonMapper, body);
                 return session.handle(message).flatMap(response -> 
ServerResponse.ok().build()).onErrorResume(error -> {
                     LOGGER.error("Error processing  message: {}", 
error.getMessage());
                     // instead of signalling the error, just respond with 200 
OK
@@ -397,7 +402,7 @@ public class ShenyuSseServerTransportProvider implements 
McpServerTransportProvi
                 .flatMap(body -> {
                     try {
                         LOGGER.debug("Received message body: {}", body);
-                        McpSchema.JSONRPCMessage message = 
McpSchema.deserializeJsonRpcMessage(objectMapper, body);
+                        McpSchema.JSONRPCMessage message = 
McpSchema.deserializeJsonRpcMessage(jsonMapper, body);
                         LOGGER.info("Deserialized JSON-RPC message for 
session: {}", sessionId);
 
                         return session.handle(message)
@@ -456,7 +461,7 @@ public class ShenyuSseServerTransportProvider implements 
McpServerTransportProvi
         public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message) {
             return Mono.fromSupplier(() -> {
                 try {
-                    return objectMapper.writeValueAsString(message);
+                    return jsonMapper.writeValueAsString(message);
                 } catch (IOException e) {
                     throw Exceptions.propagate(e);
                 }
@@ -472,8 +477,8 @@ public class ShenyuSseServerTransportProvider implements 
McpServerTransportProvi
         }
 
         @Override
-        public <T> T unmarshalFrom(final Object data, final TypeReference<T> 
typeRef) {
-            return objectMapper.convertValue(data, typeRef);
+        public <T> T unmarshalFrom(final Object data, final TypeRef<T> 
typeRef) {
+            return jsonMapper.convertValue(data, typeRef);
         }
 
         @Override
diff --git 
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java
 
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java
index d84ad676e0..31fa494bb3 100644
--- 
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java
+++ 
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java
@@ -17,8 +17,10 @@
 
 package org.apache.shenyu.plugin.mcp.server.transport;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
 import io.modelcontextprotocol.spec.McpError;
 import io.modelcontextprotocol.spec.McpSchema;
 import io.modelcontextprotocol.spec.McpServerSession;
@@ -33,7 +35,6 @@ import org.springframework.http.MediaType;
 import org.springframework.web.reactive.function.server.ServerRequest;
 import org.springframework.web.reactive.function.server.ServerResponse;
 import org.springframework.web.server.ServerWebExchange;
-import reactor.core.Exceptions;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -76,6 +77,14 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
      */
     private static final String DEFAULT_PROTOCOL_VERSION = "2025-03-26";
 
+    /**
+     * Supported MCP protocol versions.
+     */
+    private static final java.util.Set<String> SUPPORTED_PROTOCOL_VERSIONS = 
java.util.Set.of(
+            DEFAULT_PROTOCOL_VERSION,
+            "2025-11-25"
+    );
+
     /**
      * Server information constants.
      */
@@ -85,6 +94,8 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
 
     private final ObjectMapper objectMapper;
 
+    private final McpJsonMapper jsonMapper;
+
     private McpServerSession.Factory sessionFactory;
 
     /**
@@ -115,6 +126,7 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
         Assert.notNull(objectMapper, "ObjectMapper must not be null");
         Assert.notNull(endpoint, "Endpoint must not be null");
         this.objectMapper = objectMapper;
+        this.jsonMapper = new JacksonMcpJsonMapper(objectMapper);
         LOGGER.debug("Created Streamable HTTP transport provider for endpoint: 
{}", endpoint);
     }
 
@@ -185,7 +197,7 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
             // Handle CORS preflight requests
             return ServerResponse.ok()
                     .header("Access-Control-Allow-Origin", "*")
-                    .header("Access-Control-Allow-Headers", "Content-Type, 
Mcp-Session-Id, Authorization")
+                    .header("Access-Control-Allow-Headers", "Content-Type, 
Mcp-Session-Id, Authorization, Mcp-Protocol-Version")
                     .header("Access-Control-Allow-Methods", "GET, POST, 
OPTIONS")
                     .header("Access-Control-Max-Age", "3600")
                     .build();
@@ -193,7 +205,7 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
             // Streamable HTTP protocol does not support GET requests, return 
405 error
             return ServerResponse.status(HttpStatus.METHOD_NOT_ALLOWED)
                     .header("Access-Control-Allow-Origin", "*")
-                    .header("Access-Control-Allow-Headers", "Content-Type, 
Mcp-Session-Id, Authorization")
+                    .header("Access-Control-Allow-Headers", "Content-Type, 
Mcp-Session-Id, Authorization, Mcp-Protocol-Version")
                     .header("Access-Control-Allow-Methods", "POST, OPTIONS")
                     .header("Allow", "POST, OPTIONS")
                     .contentType(MediaType.APPLICATION_JSON)
@@ -209,7 +221,7 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
             return handleMessageEndpoint(exchange, request).flatMap(result -> {
                 ServerResponse.BodyBuilder builder = 
ServerResponse.status(HttpStatus.valueOf(result.getStatusCode()))
                         .header("Access-Control-Allow-Origin", "*")
-                        .header("Access-Control-Allow-Headers", "Content-Type, 
Mcp-Session-Id, Authorization")
+                        .header("Access-Control-Allow-Headers", "Content-Type, 
Mcp-Session-Id, Authorization, Mcp-Protocol-Version")
                         .header("Access-Control-Allow-Methods", "GET, POST, 
OPTIONS");
                 if (Objects.nonNull(result.getSessionId())) {
                     builder.header(SESSION_ID_HEADER, result.getSessionId());
@@ -240,7 +252,7 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
                     LOGGER.debug("Received request body with length: {} 
chars", body.length());
                     try {
                         // Deserialize JSON-RPC request
-                        final McpSchema.JSONRPCMessage message = 
McpSchema.deserializeJsonRpcMessage(objectMapper, body);
+                        final McpSchema.JSONRPCMessage message = 
McpSchema.deserializeJsonRpcMessage(jsonMapper, body);
                         LOGGER.debug("Parsed JSON-RPC message of type: {}", 
message.getClass().getSimpleName());
                         // Handle initialize requests specially
                         if (isInitializeRequest(message)) {
@@ -301,7 +313,7 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
                 LOGGER.warn("Unsupported protocol version requested: {}", 
clientProtocolVersion);
                 cleanupInvalidSession(newSessionId);
                 final Object errorResponse = createJsonRpcError(messageId, 
-32600,
-                        "Unsupported protocol version. Supported versions: ['" 
+ DEFAULT_PROTOCOL_VERSION + "']");
+                        "Unsupported protocol version. Supported versions: " + 
SUPPORTED_PROTOCOL_VERSIONS);
                 return Mono.just(new MessageHandlingResult(400, errorResponse, 
null));
             }
             // Create initialize response
@@ -552,7 +564,7 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
      * @return true if the version is supported
      */
     private boolean isSupportedProtocolVersion(final String version) {
-        return DEFAULT_PROTOCOL_VERSION.equals(version);
+        return SUPPORTED_PROTOCOL_VERSIONS.contains(version);
     }
 
     /**
@@ -711,7 +723,7 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
             LOGGER.debug("Starting backend initialization for session: {}", 
sessionId);
             // Create a proper initialize request
             final String initRequestJson = createInitializeRequest();
-            final McpSchema.JSONRPCMessage initRequest = 
McpSchema.deserializeJsonRpcMessage(objectMapper, initRequestJson);
+            final McpSchema.JSONRPCMessage initRequest = 
McpSchema.deserializeJsonRpcMessage(jsonMapper, initRequestJson);
             LOGGER.debug("Created initialize request for session: {}", 
sessionId);
             // Use subscribe instead of block to avoid blocking in Netty thread
             session.handle(initRequest)
@@ -769,7 +781,7 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
         request.put("method", INITIALIZE_METHOD);
         request.put("params", params);
         try {
-            return objectMapper.writeValueAsString(request);
+            return jsonMapper.writeValueAsString(request);
         } catch (Exception e) {
             LOGGER.error("Failed to create initialize request JSON: {}", 
e.getMessage());
             return 
"{\"jsonrpc\":\"2.0\",\"id\":\"__backend_init\",\"method\":\"initialize\",\"params\":{\"protocolVersion\":\""
 + DEFAULT_PROTOCOL_VERSION + "\"}}";
@@ -843,8 +855,8 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
                 notification.put("jsonrpc", JSONRPC_VERSION);
                 notification.put("method", "notifications/initialized");
                 notification.put("params", new java.util.HashMap<>());
-                final String notificationJson = 
objectMapper.writeValueAsString(notification);
-                final McpSchema.JSONRPCMessage notificationMessage = 
McpSchema.deserializeJsonRpcMessage(objectMapper, notificationJson);
+                final String notificationJson = 
jsonMapper.writeValueAsString(notification);
+                final McpSchema.JSONRPCMessage notificationMessage = 
McpSchema.deserializeJsonRpcMessage(jsonMapper, notificationJson);
                 session.handle(notificationMessage)
                         .doOnSuccess(v -> {
                             LOGGER.debug("Initialized notification sent 
successfully for session: {}", sessionId);
@@ -954,12 +966,8 @@ public class ShenyuStreamableHttpServerTransportProvider 
implements McpServerTra
         }
 
         @Override
-        public <T> T unmarshalFrom(final Object data, final TypeReference<T> 
typeRef) {
-            try {
-                return new ObjectMapper().convertValue(data, typeRef);
-            } catch (Exception e) {
-                throw Exceptions.propagate(e);
-            }
+        public <T> T unmarshalFrom(final Object data, final TypeRef<T> 
typeRef) {
+            return jsonMapper.convertValue(data, typeRef);
         }
 
         @Override
diff --git 
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml
 
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml
index d56719bcc4..885275916f 100644
--- 
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml
+++ 
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml
@@ -31,5 +31,9 @@
             <artifactId>shenyu-plugin-mcp-server</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.modelcontextprotocol.sdk</groupId>
+            <artifactId>mcp-json-jackson2</artifactId>
+        </dependency>
     </dependencies>
 </project>

Reply via email to