This is an automated email from the ASF dual-hosted git repository. liuhongyu pushed a commit to branch fix/fix_mcp_streamable in repository https://gitbox.apache.org/repos/asf/shenyu.git
commit e4608918264cf4274536181d18c9a657ebcfb9a5 Author: liuhy <[email protected]> AuthorDate: Thu Feb 5 14:47:01 2026 +0800 fix mcp streamable --- pom.xml | 4 +- shenyu-plugin/shenyu-plugin-mcp-server/pom.xml | 4 ++ .../shenyu/plugin/mcp/server/McpServerPlugin.java | 15 ++++++-- .../server/handler/McpServerPluginDataHandler.java | 5 +++ .../ShenyuSseServerTransportProvider.java | 17 ++++++--- ...henyuStreamableHttpServerTransportProvider.java | 44 +++++++++++++--------- .../pom.xml | 4 ++ 7 files changed, 64 insertions(+), 29 deletions(-) diff --git a/pom.xml b/pom.xml index e510a85d0a..d53ade0ddd 100644 --- a/pom.xml +++ b/pom.xml @@ -186,8 +186,8 @@ <wasmtime-java.version>0.19.0</wasmtime-java.version> <bcprov-jdk18on.version>1.78</bcprov-jdk18on.version> <oceanbase.version>2.4.12</oceanbase.version> - <spring-ai.version>1.0.0</spring-ai.version> - <mcp.version>0.10.0</mcp.version> + <spring-ai.version>1.1.2</spring-ai.version> + <mcp.version>0.17.0</mcp.version> <swagger-parser.version>2.1.30</swagger-parser.version> <!-- dependency version end --> </properties> diff --git a/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml b/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml index 1f69394e53..a29abdc65b 100644 --- a/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml +++ b/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml @@ -69,6 +69,10 @@ <groupId>io.modelcontextprotocol.sdk</groupId> <artifactId>mcp-spring-webflux</artifactId> </dependency> + <dependency> + <groupId>io.modelcontextprotocol.sdk</groupId> + <artifactId>mcp-json-jackson2</artifactId> + </dependency> <!-- <dependency>--> <!-- <groupId>org.springframework.ai</groupId>--> <!-- <artifactId>spring-ai-starter-mcp-server</artifactId>--> diff --git a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java index fce5ac904c..ec540683c7 100644 --- a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java +++ b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java @@ -143,8 +143,17 @@ public class McpServerPlugin extends AbstractShenyuPlugin { LOG.debug("Processing MCP request with URI: {}", uri); if (!shenyuMcpServerManager.canRoute(uri)) { - LOG.debug("URI not handled by MCP server, continuing chain: {}", uri); - return chain.execute(exchange); + ShenyuMcpServer server = McpServerPluginDataHandler.CACHED_SERVER.get().obtainHandle(selector.getId()); + if (Objects.nonNull(server)) { + String serverPath = server.getPath(); + String messageEndpoint = server.getMessageEndpoint(); + shenyuMcpServerManager.getOrCreateMcpServerTransport(serverPath, messageEndpoint); + shenyuMcpServerManager.getOrCreateStreamableHttpTransport(serverPath + STREAMABLE_HTTP_PATH); + } + if (!shenyuMcpServerManager.canRoute(uri)) { + LOG.debug("URI not handled by MCP server, continuing chain: {}", uri); + return chain.execute(exchange); + } } LOG.debug("Handling MCP request for URI: {}", uri); @@ -572,7 +581,7 @@ public class McpServerPlugin extends AbstractShenyuPlugin { private void setCorsHeaders(final ServerWebExchange exchange) { exchange.getResponse().getHeaders().set("Access-Control-Allow-Origin", "*"); exchange.getResponse().getHeaders().set("Access-Control-Allow-Headers", - "Content-Type, Mcp-Session-Id, Authorization, Last-Event-ID"); + "Content-Type, Mcp-Session-Id, Authorization, Last-Event-ID, Mcp-Protocol-Version"); exchange.getResponse().getHeaders().set("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); } diff --git a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java index 97c970d4fa..b660e9df84 100644 --- a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java +++ b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java @@ -58,6 +58,8 @@ public class McpServerPluginDataHandler implements PluginDataHandler { private static final String SLASH = "/"; private static final String STAR = "/**"; + + private static final String STREAMABLE_HTTP_PATH = "/streamablehttp"; private final ShenyuMcpServerManager shenyuMcpServerManager; @@ -95,6 +97,9 @@ public class McpServerPluginDataHandler implements PluginDataHandler { if (StringUtils.isNotBlank(uri) && !shenyuMcpServerManager.hasMcpServer(uri)) { shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint); } + if (StringUtils.isNotBlank(path)) { + shenyuMcpServerManager.getOrCreateStreamableHttpTransport(path + STREAMABLE_HTTP_PATH); + } // the update is also need to clean, but there is no way to // distinguish between crate and update, so it is always clean diff --git a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java index f492ae1881..b0bccdf0ef 100644 --- a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java +++ b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java @@ -17,8 +17,10 @@ package org.apache.shenyu.plugin.mcp.server.transport; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper; import io.modelcontextprotocol.spec.McpError; import io.modelcontextprotocol.spec.McpSchema; import io.modelcontextprotocol.spec.McpServerSession; @@ -71,6 +73,8 @@ public class ShenyuSseServerTransportProvider implements McpServerTransportProvi private final ObjectMapper objectMapper; + private final McpJsonMapper jsonMapper; + /** * Base URL for the message endpoint. This is used to construct the full URL for * clients to send their JSON-RPC messages. @@ -153,6 +157,7 @@ public class ShenyuSseServerTransportProvider implements McpServerTransportProvi Assert.notNull(sseEndpoint, "SSE endpoint must not be null"); this.objectMapper = objectMapper; + this.jsonMapper = new JacksonMcpJsonMapper(this.objectMapper); this.baseUrl = baseUrl; this.messageEndpoint = messageEndpoint; this.sseEndpoint = sseEndpoint; @@ -349,7 +354,7 @@ public class ShenyuSseServerTransportProvider implements McpServerTransportProvi return request.bodyToMono(String.class).flatMap(body -> { try { - McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body); + McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, body); return session.handle(message).flatMap(response -> ServerResponse.ok().build()).onErrorResume(error -> { LOGGER.error("Error processing message: {}", error.getMessage()); // instead of signalling the error, just respond with 200 OK @@ -397,7 +402,7 @@ public class ShenyuSseServerTransportProvider implements McpServerTransportProvi .flatMap(body -> { try { LOGGER.debug("Received message body: {}", body); - McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body); + McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, body); LOGGER.info("Deserialized JSON-RPC message for session: {}", sessionId); return session.handle(message) @@ -456,7 +461,7 @@ public class ShenyuSseServerTransportProvider implements McpServerTransportProvi public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message) { return Mono.fromSupplier(() -> { try { - return objectMapper.writeValueAsString(message); + return jsonMapper.writeValueAsString(message); } catch (IOException e) { throw Exceptions.propagate(e); } @@ -472,8 +477,8 @@ public class ShenyuSseServerTransportProvider implements McpServerTransportProvi } @Override - public <T> T unmarshalFrom(final Object data, final TypeReference<T> typeRef) { - return objectMapper.convertValue(data, typeRef); + public <T> T unmarshalFrom(final Object data, final TypeRef<T> typeRef) { + return jsonMapper.convertValue(data, typeRef); } @Override diff --git a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java index d84ad676e0..31fa494bb3 100644 --- a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java +++ b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java @@ -17,8 +17,10 @@ package org.apache.shenyu.plugin.mcp.server.transport; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper; import io.modelcontextprotocol.spec.McpError; import io.modelcontextprotocol.spec.McpSchema; import io.modelcontextprotocol.spec.McpServerSession; @@ -33,7 +35,6 @@ import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import org.springframework.web.server.ServerWebExchange; -import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -76,6 +77,14 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra */ private static final String DEFAULT_PROTOCOL_VERSION = "2025-03-26"; + /** + * Supported MCP protocol versions. + */ + private static final java.util.Set<String> SUPPORTED_PROTOCOL_VERSIONS = java.util.Set.of( + DEFAULT_PROTOCOL_VERSION, + "2025-11-25" + ); + /** * Server information constants. */ @@ -85,6 +94,8 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra private final ObjectMapper objectMapper; + private final McpJsonMapper jsonMapper; + private McpServerSession.Factory sessionFactory; /** @@ -115,6 +126,7 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra Assert.notNull(objectMapper, "ObjectMapper must not be null"); Assert.notNull(endpoint, "Endpoint must not be null"); this.objectMapper = objectMapper; + this.jsonMapper = new JacksonMcpJsonMapper(objectMapper); LOGGER.debug("Created Streamable HTTP transport provider for endpoint: {}", endpoint); } @@ -185,7 +197,7 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra // Handle CORS preflight requests return ServerResponse.ok() .header("Access-Control-Allow-Origin", "*") - .header("Access-Control-Allow-Headers", "Content-Type, Mcp-Session-Id, Authorization") + .header("Access-Control-Allow-Headers", "Content-Type, Mcp-Session-Id, Authorization, Mcp-Protocol-Version") .header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") .header("Access-Control-Max-Age", "3600") .build(); @@ -193,7 +205,7 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra // Streamable HTTP protocol does not support GET requests, return 405 error return ServerResponse.status(HttpStatus.METHOD_NOT_ALLOWED) .header("Access-Control-Allow-Origin", "*") - .header("Access-Control-Allow-Headers", "Content-Type, Mcp-Session-Id, Authorization") + .header("Access-Control-Allow-Headers", "Content-Type, Mcp-Session-Id, Authorization, Mcp-Protocol-Version") .header("Access-Control-Allow-Methods", "POST, OPTIONS") .header("Allow", "POST, OPTIONS") .contentType(MediaType.APPLICATION_JSON) @@ -209,7 +221,7 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra return handleMessageEndpoint(exchange, request).flatMap(result -> { ServerResponse.BodyBuilder builder = ServerResponse.status(HttpStatus.valueOf(result.getStatusCode())) .header("Access-Control-Allow-Origin", "*") - .header("Access-Control-Allow-Headers", "Content-Type, Mcp-Session-Id, Authorization") + .header("Access-Control-Allow-Headers", "Content-Type, Mcp-Session-Id, Authorization, Mcp-Protocol-Version") .header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); if (Objects.nonNull(result.getSessionId())) { builder.header(SESSION_ID_HEADER, result.getSessionId()); @@ -240,7 +252,7 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra LOGGER.debug("Received request body with length: {} chars", body.length()); try { // Deserialize JSON-RPC request - final McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body); + final McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, body); LOGGER.debug("Parsed JSON-RPC message of type: {}", message.getClass().getSimpleName()); // Handle initialize requests specially if (isInitializeRequest(message)) { @@ -301,7 +313,7 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra LOGGER.warn("Unsupported protocol version requested: {}", clientProtocolVersion); cleanupInvalidSession(newSessionId); final Object errorResponse = createJsonRpcError(messageId, -32600, - "Unsupported protocol version. Supported versions: ['" + DEFAULT_PROTOCOL_VERSION + "']"); + "Unsupported protocol version. Supported versions: " + SUPPORTED_PROTOCOL_VERSIONS); return Mono.just(new MessageHandlingResult(400, errorResponse, null)); } // Create initialize response @@ -552,7 +564,7 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra * @return true if the version is supported */ private boolean isSupportedProtocolVersion(final String version) { - return DEFAULT_PROTOCOL_VERSION.equals(version); + return SUPPORTED_PROTOCOL_VERSIONS.contains(version); } /** @@ -711,7 +723,7 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra LOGGER.debug("Starting backend initialization for session: {}", sessionId); // Create a proper initialize request final String initRequestJson = createInitializeRequest(); - final McpSchema.JSONRPCMessage initRequest = McpSchema.deserializeJsonRpcMessage(objectMapper, initRequestJson); + final McpSchema.JSONRPCMessage initRequest = McpSchema.deserializeJsonRpcMessage(jsonMapper, initRequestJson); LOGGER.debug("Created initialize request for session: {}", sessionId); // Use subscribe instead of block to avoid blocking in Netty thread session.handle(initRequest) @@ -769,7 +781,7 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra request.put("method", INITIALIZE_METHOD); request.put("params", params); try { - return objectMapper.writeValueAsString(request); + return jsonMapper.writeValueAsString(request); } catch (Exception e) { LOGGER.error("Failed to create initialize request JSON: {}", e.getMessage()); return "{\"jsonrpc\":\"2.0\",\"id\":\"__backend_init\",\"method\":\"initialize\",\"params\":{\"protocolVersion\":\"" + DEFAULT_PROTOCOL_VERSION + "\"}}"; @@ -843,8 +855,8 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra notification.put("jsonrpc", JSONRPC_VERSION); notification.put("method", "notifications/initialized"); notification.put("params", new java.util.HashMap<>()); - final String notificationJson = objectMapper.writeValueAsString(notification); - final McpSchema.JSONRPCMessage notificationMessage = McpSchema.deserializeJsonRpcMessage(objectMapper, notificationJson); + final String notificationJson = jsonMapper.writeValueAsString(notification); + final McpSchema.JSONRPCMessage notificationMessage = McpSchema.deserializeJsonRpcMessage(jsonMapper, notificationJson); session.handle(notificationMessage) .doOnSuccess(v -> { LOGGER.debug("Initialized notification sent successfully for session: {}", sessionId); @@ -954,12 +966,8 @@ public class ShenyuStreamableHttpServerTransportProvider implements McpServerTra } @Override - public <T> T unmarshalFrom(final Object data, final TypeReference<T> typeRef) { - try { - return new ObjectMapper().convertValue(data, typeRef); - } catch (Exception e) { - throw Exceptions.propagate(e); - } + public <T> T unmarshalFrom(final Object data, final TypeRef<T> typeRef) { + return jsonMapper.convertValue(data, typeRef); } @Override diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml index d56719bcc4..885275916f 100644 --- a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml +++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml @@ -31,5 +31,9 @@ <artifactId>shenyu-plugin-mcp-server</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>io.modelcontextprotocol.sdk</groupId> + <artifactId>mcp-json-jackson2</artifactId> + </dependency> </dependencies> </project>
