This is an automated email from the ASF dual-hosted git repository.
shown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new c224816207 fix mcp streamable (#6289)
c224816207 is described below
commit c224816207e5f71cc5e9f8e0041d0cc0218f4c95
Author: aias00 <[email protected]>
AuthorDate: Sat Feb 7 13:24:41 2026 +0800
fix mcp streamable (#6289)
* fix mcp streamable
* fix ci
* fix ci
---
pom.xml | 4 +-
shenyu-plugin/shenyu-plugin-mcp-server/pom.xml | 4 ++
.../shenyu/plugin/mcp/server/McpServerPlugin.java | 15 ++++++--
.../server/handler/McpServerPluginDataHandler.java | 5 +++
.../ShenyuSseServerTransportProvider.java | 17 ++++++---
...henyuStreamableHttpServerTransportProvider.java | 44 +++++++++++++---------
.../plugin/mcp/server/McpServerPluginTest.java | 2 +
.../pom.xml | 4 ++
8 files changed, 66 insertions(+), 29 deletions(-)
diff --git a/pom.xml b/pom.xml
index e510a85d0a..d53ade0ddd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,8 +186,8 @@
<wasmtime-java.version>0.19.0</wasmtime-java.version>
<bcprov-jdk18on.version>1.78</bcprov-jdk18on.version>
<oceanbase.version>2.4.12</oceanbase.version>
- <spring-ai.version>1.0.0</spring-ai.version>
- <mcp.version>0.10.0</mcp.version>
+ <spring-ai.version>1.1.2</spring-ai.version>
+ <mcp.version>0.17.0</mcp.version>
<swagger-parser.version>2.1.30</swagger-parser.version>
<!-- dependency version end -->
</properties>
diff --git a/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml
b/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml
index 1f69394e53..a29abdc65b 100644
--- a/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml
+++ b/shenyu-plugin/shenyu-plugin-mcp-server/pom.xml
@@ -69,6 +69,10 @@
<groupId>io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp-spring-webflux</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.modelcontextprotocol.sdk</groupId>
+ <artifactId>mcp-json-jackson2</artifactId>
+ </dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.ai</groupId>-->
<!-- <artifactId>spring-ai-starter-mcp-server</artifactId>-->
diff --git
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java
index fce5ac904c..ec540683c7 100644
---
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/McpServerPlugin.java
@@ -143,8 +143,17 @@ public class McpServerPlugin extends AbstractShenyuPlugin {
LOG.debug("Processing MCP request with URI: {}", uri);
if (!shenyuMcpServerManager.canRoute(uri)) {
- LOG.debug("URI not handled by MCP server, continuing chain: {}",
uri);
- return chain.execute(exchange);
+ ShenyuMcpServer server =
McpServerPluginDataHandler.CACHED_SERVER.get().obtainHandle(selector.getId());
+ if (Objects.nonNull(server)) {
+ String serverPath = server.getPath();
+ String messageEndpoint = server.getMessageEndpoint();
+
shenyuMcpServerManager.getOrCreateMcpServerTransport(serverPath,
messageEndpoint);
+
shenyuMcpServerManager.getOrCreateStreamableHttpTransport(serverPath +
STREAMABLE_HTTP_PATH);
+ }
+ if (!shenyuMcpServerManager.canRoute(uri)) {
+ LOG.debug("URI not handled by MCP server, continuing chain:
{}", uri);
+ return chain.execute(exchange);
+ }
}
LOG.debug("Handling MCP request for URI: {}", uri);
@@ -572,7 +581,7 @@ public class McpServerPlugin extends AbstractShenyuPlugin {
private void setCorsHeaders(final ServerWebExchange exchange) {
exchange.getResponse().getHeaders().set("Access-Control-Allow-Origin",
"*");
exchange.getResponse().getHeaders().set("Access-Control-Allow-Headers",
- "Content-Type, Mcp-Session-Id, Authorization, Last-Event-ID");
+ "Content-Type, Mcp-Session-Id, Authorization, Last-Event-ID,
Mcp-Protocol-Version");
exchange.getResponse().getHeaders().set("Access-Control-Allow-Methods",
"GET, POST, OPTIONS");
}
diff --git
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java
index 97c970d4fa..b660e9df84 100644
---
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/handler/McpServerPluginDataHandler.java
@@ -58,6 +58,8 @@ public class McpServerPluginDataHandler implements
PluginDataHandler {
private static final String SLASH = "/";
private static final String STAR = "/**";
+
+ private static final String STREAMABLE_HTTP_PATH = "/streamablehttp";
private final ShenyuMcpServerManager shenyuMcpServerManager;
@@ -95,6 +97,9 @@ public class McpServerPluginDataHandler implements
PluginDataHandler {
if (StringUtils.isNotBlank(uri) &&
!shenyuMcpServerManager.hasMcpServer(uri)) {
shenyuMcpServerManager.getOrCreateMcpServerTransport(uri,
messageEndpoint);
}
+ if (StringUtils.isNotBlank(path)) {
+ shenyuMcpServerManager.getOrCreateStreamableHttpTransport(path +
STREAMABLE_HTTP_PATH);
+ }
// the update is also need to clean, but there is no way to
// distinguish between crate and update, so it is always clean
diff --git
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java
index f492ae1881..b0bccdf0ef 100644
---
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java
+++
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuSseServerTransportProvider.java
@@ -17,8 +17,10 @@
package org.apache.shenyu.plugin.mcp.server.transport;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpServerSession;
@@ -71,6 +73,8 @@ public class ShenyuSseServerTransportProvider implements
McpServerTransportProvi
private final ObjectMapper objectMapper;
+ private final McpJsonMapper jsonMapper;
+
/**
* Base URL for the message endpoint. This is used to construct the full
URL for
* clients to send their JSON-RPC messages.
@@ -153,6 +157,7 @@ public class ShenyuSseServerTransportProvider implements
McpServerTransportProvi
Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
this.objectMapper = objectMapper;
+ this.jsonMapper = new JacksonMcpJsonMapper(this.objectMapper);
this.baseUrl = baseUrl;
this.messageEndpoint = messageEndpoint;
this.sseEndpoint = sseEndpoint;
@@ -349,7 +354,7 @@ public class ShenyuSseServerTransportProvider implements
McpServerTransportProvi
return request.bodyToMono(String.class).flatMap(body -> {
try {
- McpSchema.JSONRPCMessage message =
McpSchema.deserializeJsonRpcMessage(objectMapper, body);
+ McpSchema.JSONRPCMessage message =
McpSchema.deserializeJsonRpcMessage(jsonMapper, body);
return session.handle(message).flatMap(response ->
ServerResponse.ok().build()).onErrorResume(error -> {
LOGGER.error("Error processing message: {}",
error.getMessage());
// instead of signalling the error, just respond with 200
OK
@@ -397,7 +402,7 @@ public class ShenyuSseServerTransportProvider implements
McpServerTransportProvi
.flatMap(body -> {
try {
LOGGER.debug("Received message body: {}", body);
- McpSchema.JSONRPCMessage message =
McpSchema.deserializeJsonRpcMessage(objectMapper, body);
+ McpSchema.JSONRPCMessage message =
McpSchema.deserializeJsonRpcMessage(jsonMapper, body);
LOGGER.info("Deserialized JSON-RPC message for
session: {}", sessionId);
return session.handle(message)
@@ -456,7 +461,7 @@ public class ShenyuSseServerTransportProvider implements
McpServerTransportProvi
public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message) {
return Mono.fromSupplier(() -> {
try {
- return objectMapper.writeValueAsString(message);
+ return jsonMapper.writeValueAsString(message);
} catch (IOException e) {
throw Exceptions.propagate(e);
}
@@ -472,8 +477,8 @@ public class ShenyuSseServerTransportProvider implements
McpServerTransportProvi
}
@Override
- public <T> T unmarshalFrom(final Object data, final TypeReference<T>
typeRef) {
- return objectMapper.convertValue(data, typeRef);
+ public <T> T unmarshalFrom(final Object data, final TypeRef<T>
typeRef) {
+ return jsonMapper.convertValue(data, typeRef);
}
@Override
diff --git
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java
index d84ad676e0..31fa494bb3 100644
---
a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java
+++
b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/transport/ShenyuStreamableHttpServerTransportProvider.java
@@ -17,8 +17,10 @@
package org.apache.shenyu.plugin.mcp.server.transport;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpServerSession;
@@ -33,7 +35,6 @@ import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ServerWebExchange;
-import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -76,6 +77,14 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
*/
private static final String DEFAULT_PROTOCOL_VERSION = "2025-03-26";
+ /**
+ * Supported MCP protocol versions.
+ */
+ private static final java.util.Set<String> SUPPORTED_PROTOCOL_VERSIONS =
java.util.Set.of(
+ DEFAULT_PROTOCOL_VERSION,
+ "2025-11-25"
+ );
+
/**
* Server information constants.
*/
@@ -85,6 +94,8 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
private final ObjectMapper objectMapper;
+ private final McpJsonMapper jsonMapper;
+
private McpServerSession.Factory sessionFactory;
/**
@@ -115,6 +126,7 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.notNull(endpoint, "Endpoint must not be null");
this.objectMapper = objectMapper;
+ this.jsonMapper = new JacksonMcpJsonMapper(objectMapper);
LOGGER.debug("Created Streamable HTTP transport provider for endpoint:
{}", endpoint);
}
@@ -185,7 +197,7 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
// Handle CORS preflight requests
return ServerResponse.ok()
.header("Access-Control-Allow-Origin", "*")
- .header("Access-Control-Allow-Headers", "Content-Type,
Mcp-Session-Id, Authorization")
+ .header("Access-Control-Allow-Headers", "Content-Type,
Mcp-Session-Id, Authorization, Mcp-Protocol-Version")
.header("Access-Control-Allow-Methods", "GET, POST,
OPTIONS")
.header("Access-Control-Max-Age", "3600")
.build();
@@ -193,7 +205,7 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
// Streamable HTTP protocol does not support GET requests, return
405 error
return ServerResponse.status(HttpStatus.METHOD_NOT_ALLOWED)
.header("Access-Control-Allow-Origin", "*")
- .header("Access-Control-Allow-Headers", "Content-Type,
Mcp-Session-Id, Authorization")
+ .header("Access-Control-Allow-Headers", "Content-Type,
Mcp-Session-Id, Authorization, Mcp-Protocol-Version")
.header("Access-Control-Allow-Methods", "POST, OPTIONS")
.header("Allow", "POST, OPTIONS")
.contentType(MediaType.APPLICATION_JSON)
@@ -209,7 +221,7 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
return handleMessageEndpoint(exchange, request).flatMap(result -> {
ServerResponse.BodyBuilder builder =
ServerResponse.status(HttpStatus.valueOf(result.getStatusCode()))
.header("Access-Control-Allow-Origin", "*")
- .header("Access-Control-Allow-Headers", "Content-Type,
Mcp-Session-Id, Authorization")
+ .header("Access-Control-Allow-Headers", "Content-Type,
Mcp-Session-Id, Authorization, Mcp-Protocol-Version")
.header("Access-Control-Allow-Methods", "GET, POST,
OPTIONS");
if (Objects.nonNull(result.getSessionId())) {
builder.header(SESSION_ID_HEADER, result.getSessionId());
@@ -240,7 +252,7 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
LOGGER.debug("Received request body with length: {}
chars", body.length());
try {
// Deserialize JSON-RPC request
- final McpSchema.JSONRPCMessage message =
McpSchema.deserializeJsonRpcMessage(objectMapper, body);
+ final McpSchema.JSONRPCMessage message =
McpSchema.deserializeJsonRpcMessage(jsonMapper, body);
LOGGER.debug("Parsed JSON-RPC message of type: {}",
message.getClass().getSimpleName());
// Handle initialize requests specially
if (isInitializeRequest(message)) {
@@ -301,7 +313,7 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
LOGGER.warn("Unsupported protocol version requested: {}",
clientProtocolVersion);
cleanupInvalidSession(newSessionId);
final Object errorResponse = createJsonRpcError(messageId,
-32600,
- "Unsupported protocol version. Supported versions: ['"
+ DEFAULT_PROTOCOL_VERSION + "']");
+ "Unsupported protocol version. Supported versions: " +
SUPPORTED_PROTOCOL_VERSIONS);
return Mono.just(new MessageHandlingResult(400, errorResponse,
null));
}
// Create initialize response
@@ -552,7 +564,7 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
* @return true if the version is supported
*/
private boolean isSupportedProtocolVersion(final String version) {
- return DEFAULT_PROTOCOL_VERSION.equals(version);
+ return SUPPORTED_PROTOCOL_VERSIONS.contains(version);
}
/**
@@ -711,7 +723,7 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
LOGGER.debug("Starting backend initialization for session: {}",
sessionId);
// Create a proper initialize request
final String initRequestJson = createInitializeRequest();
- final McpSchema.JSONRPCMessage initRequest =
McpSchema.deserializeJsonRpcMessage(objectMapper, initRequestJson);
+ final McpSchema.JSONRPCMessage initRequest =
McpSchema.deserializeJsonRpcMessage(jsonMapper, initRequestJson);
LOGGER.debug("Created initialize request for session: {}",
sessionId);
// Use subscribe instead of block to avoid blocking in Netty thread
session.handle(initRequest)
@@ -769,7 +781,7 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
request.put("method", INITIALIZE_METHOD);
request.put("params", params);
try {
- return objectMapper.writeValueAsString(request);
+ return jsonMapper.writeValueAsString(request);
} catch (Exception e) {
LOGGER.error("Failed to create initialize request JSON: {}",
e.getMessage());
return
"{\"jsonrpc\":\"2.0\",\"id\":\"__backend_init\",\"method\":\"initialize\",\"params\":{\"protocolVersion\":\""
+ DEFAULT_PROTOCOL_VERSION + "\"}}";
@@ -843,8 +855,8 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
notification.put("jsonrpc", JSONRPC_VERSION);
notification.put("method", "notifications/initialized");
notification.put("params", new java.util.HashMap<>());
- final String notificationJson =
objectMapper.writeValueAsString(notification);
- final McpSchema.JSONRPCMessage notificationMessage =
McpSchema.deserializeJsonRpcMessage(objectMapper, notificationJson);
+ final String notificationJson =
jsonMapper.writeValueAsString(notification);
+ final McpSchema.JSONRPCMessage notificationMessage =
McpSchema.deserializeJsonRpcMessage(jsonMapper, notificationJson);
session.handle(notificationMessage)
.doOnSuccess(v -> {
LOGGER.debug("Initialized notification sent
successfully for session: {}", sessionId);
@@ -954,12 +966,8 @@ public class ShenyuStreamableHttpServerTransportProvider
implements McpServerTra
}
@Override
- public <T> T unmarshalFrom(final Object data, final TypeReference<T>
typeRef) {
- try {
- return new ObjectMapper().convertValue(data, typeRef);
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
+ public <T> T unmarshalFrom(final Object data, final TypeRef<T>
typeRef) {
+ return jsonMapper.convertValue(data, typeRef);
}
@Override
diff --git
a/shenyu-plugin/shenyu-plugin-mcp-server/src/test/java/org/apache/shenyu/plugin/mcp/server/McpServerPluginTest.java
b/shenyu-plugin/shenyu-plugin-mcp-server/src/test/java/org/apache/shenyu/plugin/mcp/server/McpServerPluginTest.java
index d2c81f09f4..52fbc89ce4 100644
---
a/shenyu-plugin/shenyu-plugin-mcp-server/src/test/java/org/apache/shenyu/plugin/mcp/server/McpServerPluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-mcp-server/src/test/java/org/apache/shenyu/plugin/mcp/server/McpServerPluginTest.java
@@ -118,6 +118,7 @@ class McpServerPluginTest {
@Test
void testDoExecuteWhenCannotRoute() {
+ when(selector.getId()).thenReturn("selector-1");
when(exchange.getAttribute(Constants.CONTEXT)).thenReturn(shenyuContext);
when(exchange.getRequest()).thenReturn(request);
when(request.getURI()).thenReturn(URI.create("http://localhost:8080/test"));
@@ -130,6 +131,7 @@ class McpServerPluginTest {
@Test
void testDoExecuteWhenCanRoute() {
+ when(selector.getId()).thenReturn("selector-1");
when(exchange.getAttribute(Constants.CONTEXT)).thenReturn(shenyuContext);
when(exchange.getRequest()).thenReturn(request);
when(request.getURI()).thenReturn(URI.create("http://localhost:8080/mcp/sse"));
diff --git
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml
index d56719bcc4..885275916f 100644
---
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml
+++
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-mcp-server/pom.xml
@@ -31,5 +31,9 @@
<artifactId>shenyu-plugin-mcp-server</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.modelcontextprotocol.sdk</groupId>
+ <artifactId>mcp-json-jackson2</artifactId>
+ </dependency>
</dependencies>
</project>